Skip to content

Commit 1942e32

Browse files
committed
feat: add StreamConfig password file implementation
test: add StreamConfigTest password file
1 parent d7b2709 commit 1942e32

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.gotocompany.dagger.core.source.config.models.SourceName;
1818
import com.gotocompany.dagger.core.source.config.models.TimeRangePool;
1919
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
20+
import org.apache.commons.lang3.StringUtils;
2021
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
2122

2223
import com.google.gson.Gson;
@@ -27,7 +28,10 @@
2728
import lombok.Getter;
2829
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
2930

31+
import java.io.IOException;
3032
import java.io.StringReader;
33+
import java.nio.file.Files;
34+
import java.nio.file.Paths;
3135
import java.util.Map;
3236
import java.util.Objects;
3337
import java.util.Properties;
@@ -59,6 +63,10 @@ public class StreamConfig {
5963
@Getter
6064
private String sslKeystorePassword;
6165

66+
@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION)
67+
@Getter
68+
private String sslKeystorePasswordFileLocation;
69+
6270
@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE_KEY)
6371
@Getter
6472
@JsonAdapter(value = DaggerSSLKeyStoreFileTypeAdaptor.class)
@@ -77,6 +85,10 @@ public class StreamConfig {
7785
@Getter
7886
private String sslTruststorePassword;
7987

88+
@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION)
89+
@Getter
90+
private String sslTruststorePasswordFileLocation;
91+
8092
@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE_KEY)
8193
@Getter
8294
@JsonAdapter(value = DaggerSSLTrustStoreFileTypeAdaptor.class)
@@ -217,6 +229,7 @@ public Properties getKafkaProps(Configuration configuration) {
217229
.filter(e -> e.getKey().toLowerCase().startsWith(KAFKA_PREFIX))
218230
.forEach(e -> kafkaProps.setProperty(parseVarName(e.getKey(), KAFKA_PREFIX), e.getValue()));
219231
setAdditionalKafkaConsumerConfigs(kafkaProps, configuration);
232+
setSslPasswords(kafkaProps);
220233
return kafkaProps;
221234
}
222235

@@ -247,4 +260,21 @@ public OffsetsInitializer getStartingOffset() {
247260
private OffsetResetStrategy getOffsetResetStrategy() {
248261
return OffsetResetStrategy.valueOf(autoOffsetReset.toUpperCase());
249262
}
263+
264+
private void setSslPasswords(Properties kafkaProps) {
265+
if (StringUtils.isNotEmpty(sslTruststorePasswordFileLocation)) {
266+
kafkaProps.setProperty(KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslTruststorePasswordFileLocation));
267+
}
268+
if (StringUtils.isNotEmpty(sslKeystorePasswordFileLocation)) {
269+
kafkaProps.setProperty(KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, parsePasswordFile(sslKeystorePasswordFileLocation));
270+
}
271+
}
272+
273+
private String parsePasswordFile(String path) {
274+
try {
275+
return new String(Files.readAllBytes(Paths.get(path)));
276+
} catch (IOException e) {
277+
throw new IllegalArgumentException("Error reading password file: " + path, e);
278+
}
279+
}
250280
}

dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ public class Constants {
8282
public static final String SINK_KAFKA_MAX_REQUEST_SIZE_KEY = "max.request.size";
8383
public static final String SINK_KAFKA_MAX_REQUEST_SIZE_DEFAULT = "20971520";
8484
public static final String SINK_KAFKA_LINGER_MS_DEFAULT = "0";
85+
public static final String SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY = "SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY";
86+
public static final String SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY = "SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY";
87+
88+
public static final String KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY = "ssl.truststore.password";
89+
public static final String KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY = "ssl.keystore.password";
8590

8691
public static final String ES_TYPE = "ES";
8792
public static final String HTTP_TYPE = "HTTP";
@@ -133,9 +138,11 @@ public class Constants {
133138
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD";
134139
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION";
135140
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD";
141+
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION";
136142
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE";
137143
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION";
138144
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD";
145+
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION";
139146
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE";
140147
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL";
141148

dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
import org.junit.Rule;
1313
import org.junit.Test;
1414
import org.junit.rules.ExpectedException;
15+
import org.junit.rules.TemporaryFolder;
1516
import org.mockito.Mock;
1617

18+
import java.io.File;
19+
import java.io.FileWriter;
20+
import java.io.IOException;
1721
import java.util.HashMap;
1822
import java.util.List;
1923
import java.util.Properties;
@@ -31,6 +35,9 @@ public class StreamConfigTest {
3135
@Rule
3236
public ExpectedException thrown = ExpectedException.none();
3337

38+
@Rule
39+
public TemporaryFolder temporaryFolder = new TemporaryFolder();
40+
3441
@Mock
3542
private Configuration configuration;
3643

@@ -121,18 +128,17 @@ public void shouldParseKafkaPropertiesWithSSLConfigurations() {
121128
kafkaPropMap.put("bootstrap.servers", "localhost:9092");
122129
kafkaPropMap.put("auto.offset.reset", "latest");
123130
kafkaPropMap.put("auto.commit.enable", "");
124-
kafkaPropMap.put("ssl.keystore.password", "test-keystore-pass");
131+
kafkaPropMap.put(Constants.KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, "test-keystore-pass");
125132
kafkaPropMap.put("ssl.keystore.type", "JKS");
126133
kafkaPropMap.put("ssl.keystore.location", "test-keystore-location");
127134
kafkaPropMap.put("ssl.protocol", "SSL");
128135
kafkaPropMap.put("ssl.key.password", "test-key-pass");
129136
kafkaPropMap.put("ssl.truststore.type", "JKS");
130137
kafkaPropMap.put("ssl.truststore.location", "test-truststore-location");
131-
kafkaPropMap.put("ssl.truststore.password", "test-truststore-pass");
138+
kafkaPropMap.put(Constants.KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, "test-truststore-pass");
132139
kafkaPropMap.put("security.protocol", "SSL");
133140

134141

135-
136142
Properties properties = new Properties();
137143
properties.putAll(kafkaPropMap);
138144

@@ -247,6 +253,20 @@ public void shouldParseMultipleAdditionalConsumerConfigs() {
247253
assertEquals("1000", secondStreamProperties.getProperty("offset.flush.interval.ms"));
248254
}
249255

256+
@Test
257+
public void shouldParseSslPasswordsConfig() throws IOException {
258+
File keystorePassword = writeDummyPasswordFile("ssl-keystore-password.txt", "keystore-password");
259+
File truststorePassword = writeDummyPasswordFile("ssl-truststore-password.txt", "truststore-password");
260+
when(configuration.getString(INPUT_STREAMS, ""))
261+
.thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION\": \"%s\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", keystorePassword.getAbsolutePath(), truststorePassword.getAbsolutePath()));
262+
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);
263+
264+
Properties streamProperties = streamConfigs[0].getKafkaProps(configuration);
265+
266+
assertEquals("keystore-password", streamProperties.getProperty("ssl.keystore.password"));
267+
assertEquals("truststore-password", streamProperties.getProperty("ssl.truststore.password"));
268+
}
269+
250270
@Test(expected = IllegalArgumentException.class)
251271
public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefix() {
252272
String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]";
@@ -546,4 +566,13 @@ public void shouldTrimLeadingAndTrailingWhitespacesFromParquetFilePathsWhenParqu
546566

547567
Assert.assertArrayEquals(new String[]{"gs://some-parquet-path", "gs://another-parquet-path"}, streamConfigs[0].getParquetFilePaths());
548568
}
569+
570+
private File writeDummyPasswordFile(String fileName, String password) throws IOException {
571+
File passwordFile = temporaryFolder.newFile(fileName);
572+
FileWriter writer = new FileWriter(passwordFile);
573+
writer.write(password);
574+
writer.close();
575+
return passwordFile;
576+
}
577+
549578
}

0 commit comments

Comments
 (0)