Skip to content

Commit 03d9055

Browse files
committed
test: add test
1 parent 1942e32 commit 03d9055

File tree

3 files changed

+58
-2
lines changed

3 files changed

+58
-2
lines changed

dagger-common/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,11 @@ publishing {
139139
protobuf {
140140
generatedFilesBaseDir = "$projectDir/src/generated"
141141
protoc {
142-
artifact = "com.google.protobuf:protoc:3.1.0"
142+
artifact = "com.google.protobuf:protoc:3.17.3"
143143
}
144144
plugins {
145145
grpc {
146-
artifact = "io.grpc:protoc-gen-grpc-java:1.19.0"
146+
artifact = "io.grpc:protoc-gen-grpc-java:1.60.1"
147147
}
148148
}
149149
generateProtoTasks {

dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.gotocompany.dagger.core.sink.influx.InfluxDBSink;
1111
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
1212
import com.gotocompany.dagger.core.utils.Constants;
13+
import org.apache.commons.lang3.StringUtils;
1314
import org.apache.flink.api.connector.sink.Sink;
1415
import org.apache.flink.api.java.utils.ParameterTool;
1516
import org.apache.flink.connector.base.DeliveryGuarantee;
@@ -24,6 +25,9 @@
2425
import com.gotocompany.dagger.core.sink.kafka.KafkaSerializerBuilder;
2526
import com.gotocompany.dagger.core.sink.log.LogSink;
2627

28+
import java.io.IOException;
29+
import java.nio.file.Files;
30+
import java.nio.file.Paths;
2731
import java.util.ArrayList;
2832
import java.util.HashMap;
2933
import java.util.List;
@@ -117,6 +121,7 @@ protected Properties getProducerProperties(Configuration configuration) {
117121
.map(ParameterTool::getProperties)
118122
.orElseGet(Properties::new));
119123
kafkaProducerConfigs.putAll(dynamicProperties);
124+
setSslPasswordsFromFile(configuration, kafkaProducerConfigs);
120125
return kafkaProducerConfigs;
121126
}
122127

@@ -136,4 +141,23 @@ public Map<String, List<String>> getTelemetry() {
136141
private void addMetric(String key, String value) {
137142
metrics.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
138143
}
144+
145+
private void setSslPasswordsFromFile(Configuration configuration, Properties kafkaProps) {
146+
String sslTruststorePasswordFileLocation = configuration.getString(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY, StringUtils.EMPTY);
147+
if (StringUtils.isNotEmpty(sslTruststorePasswordFileLocation)) {
148+
kafkaProps.setProperty(Constants.KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslTruststorePasswordFileLocation));
149+
}
150+
String sslKeystorePasswordFileLocation = configuration.getString(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY, StringUtils.EMPTY);
151+
if (StringUtils.isNotEmpty(sslKeystorePasswordFileLocation)) {
152+
kafkaProps.setProperty(Constants.KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslKeystorePasswordFileLocation));
153+
}
154+
}
155+
156+
private String parsePasswordFile(String path) {
157+
try {
158+
return new String(Files.readAllBytes(Paths.get(path)));
159+
} catch (IOException e) {
160+
throw new IllegalArgumentException("Error reading password file: " + path, e);
161+
}
162+
}
139163
}

dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,38 @@ public void shouldParseSslPasswordsConfig() throws IOException {
267267
assertEquals("truststore-password", streamProperties.getProperty("ssl.truststore.password"));
268268
}
269269

270+
@Test
271+
public void shouldOverrideExplicitSslPasswordConfig() throws IOException {
272+
File keystorePassword = writeDummyPasswordFile("ssl-keystore-password.txt", "keystore-password");
273+
File truststorePassword = writeDummyPasswordFile("ssl-truststore-password.txt", "truststore-password");
274+
when(configuration.getString(INPUT_STREAMS, ""))
275+
.thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD\": \"password\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION\": \"%s\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", keystorePassword.getAbsolutePath(), truststorePassword.getAbsolutePath()));
276+
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);
277+
278+
Properties streamProperties = streamConfigs[0].getKafkaProps(configuration);
279+
280+
assertEquals("keystore-password", streamProperties.getProperty("ssl.keystore.password"));
281+
assertEquals("truststore-password", streamProperties.getProperty("ssl.truststore.password"));
282+
}
283+
284+
@Test(expected = IllegalArgumentException.class)
285+
public void shouldThrowIllegalArgumentExceptionWhenKeystorePasswordFileNotExists() {
286+
when(configuration.getString(INPUT_STREAMS, ""))
287+
.thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", "non-exist.txt"));
288+
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);
289+
290+
streamConfigs[0].getKafkaProps(configuration);
291+
}
292+
293+
@Test(expected = IllegalArgumentException.class)
294+
public void shouldThrowIllegalArgumentExceptionWhenTruststorePasswordFileNotExists() {
295+
when(configuration.getString(INPUT_STREAMS, ""))
296+
.thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", "non-exist.txt"));
297+
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);
298+
299+
streamConfigs[0].getKafkaProps(configuration);
300+
}
301+
270302
@Test(expected = IllegalArgumentException.class)
271303
public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefix() {
272304
String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]";

0 commit comments

Comments
 (0)