12
12
import org .apache .flink .api .java .utils .ParameterTool ;
13
13
import org .junit .Assert ;
14
14
import org .junit .Before ;
15
+ import org .junit .Rule ;
15
16
import org .junit .Test ;
17
+ import org .junit .rules .TemporaryFolder ;
16
18
import org .mockito .Mock ;
17
19
20
+ import java .io .File ;
21
+ import java .io .FileWriter ;
22
+ import java .io .IOException ;
18
23
import java .util .*;
19
24
20
25
import static com .gotocompany .dagger .common .core .Constants .*;
@@ -36,6 +41,8 @@ public class SinkOrchestratorTest {
36
41
private MetricsTelemetryExporter telemetryExporter ;
37
42
@ Mock
38
43
private DaggerStatsDReporter daggerStatsDReporter ;
44
+ @ Rule
45
+ public TemporaryFolder temporaryFolder = new TemporaryFolder ();
39
46
40
47
@ Before
41
48
public void setup () {
@@ -73,14 +80,15 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception {
73
80
}
74
81
75
82
@ Test
76
- public void shouldSetKafkaProducerConfigurations () throws Exception {
83
+ public void shouldSetKafkaProducerConfigurations () {
77
84
Map <String , String > additionalParameters = new HashMap <>();
78
85
additionalParameters .put (SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS , SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE );
79
86
when (configuration .getString (eq (Constants .SINK_KAFKA_BROKERS_KEY ), anyString ())).thenReturn ("10.200.216.87:6668" );
80
87
when (configuration .getBoolean (eq (Constants .SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY ), anyBoolean ())).thenReturn (true );
81
88
when (configuration .getString (eq (Constants .SINK_KAFKA_LINGER_MS_KEY ), anyString ())).thenReturn ("1000" );
82
89
when (configuration .getParam ()).thenReturn (ParameterTool .fromMap (additionalParameters ));
83
90
when (configuration .getString (eq (SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS ), anyString ())).thenReturn (SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE );
91
+
84
92
Properties producerProperties = sinkOrchestrator .getProducerProperties (configuration );
85
93
86
94
assertEquals (producerProperties .getProperty ("compression.type" ), "snappy" );
@@ -89,6 +97,38 @@ public void shouldSetKafkaProducerConfigurations() throws Exception {
89
97
assertEquals (producerProperties .getProperty ("sasl.login.callback.handler.class" ), SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE );
90
98
}
91
99
100
+ @ Test
101
+ public void shouldParseSslPasswords () throws IOException {
102
+ File trustStorePasswordFile = writeDummyPasswordFile ("truststore-password.txt" , "truststore-password" );
103
+ File keyStorePasswordFile = writeDummyPasswordFile ("keystore-password.txt" , "keystore-password" );
104
+ when (configuration .getString (eq (Constants .SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY ), anyString ())).thenReturn (trustStorePasswordFile .getAbsolutePath ());
105
+ when (configuration .getString (eq (Constants .SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY ), anyString ())).thenReturn (keyStorePasswordFile .getAbsolutePath ());
106
+ when (configuration .getString (eq (Constants .SINK_KAFKA_BROKERS_KEY ), anyString ())).thenReturn ("10.200.216.87:6668" );
107
+ when (configuration .getBoolean (eq (Constants .SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY ), anyBoolean ())).thenReturn (true );
108
+ when (configuration .getString (eq (Constants .SINK_KAFKA_LINGER_MS_KEY ), anyString ())).thenReturn ("1000" );
109
+
110
+ Properties producerProperties = sinkOrchestrator .getProducerProperties (configuration );
111
+
112
+ assertEquals (producerProperties .getProperty ("ssl.truststore.password" ), "truststore-password" );
113
+ assertEquals (producerProperties .getProperty ("ssl.keystore.password" ), "keystore-password" );
114
+ }
115
+
116
+ @ Test
117
+ public void shouldThr () throws IOException {
118
+ File trustStorePasswordFile = writeDummyPasswordFile ("truststore-password.txt" , "truststore-password" );
119
+ File keyStorePasswordFile = writeDummyPasswordFile ("keystore-password.txt" , "keystore-password" );
120
+ when (configuration .getString (eq (Constants .SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY ), anyString ())).thenReturn (trustStorePasswordFile .getAbsolutePath ());
121
+ when (configuration .getString (eq (Constants .SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY ), anyString ())).thenReturn (keyStorePasswordFile .getAbsolutePath ());
122
+ when (configuration .getString (eq (Constants .SINK_KAFKA_BROKERS_KEY ), anyString ())).thenReturn ("10.200.216.87:6668" );
123
+ when (configuration .getBoolean (eq (Constants .SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY ), anyBoolean ())).thenReturn (true );
124
+ when (configuration .getString (eq (Constants .SINK_KAFKA_LINGER_MS_KEY ), anyString ())).thenReturn ("1000" );
125
+
126
+ Properties producerProperties = sinkOrchestrator .getProducerProperties (configuration );
127
+
128
+ assertEquals (producerProperties .getProperty ("ssl.truststore.password" ), "truststore-password" );
129
+ assertEquals (producerProperties .getProperty ("ssl.keystore.password" ), "keystore-password" );
130
+ }
131
+
92
132
@ Test
93
133
public void shouldThrowIllegalArgumentExceptionForInvalidLingerMs () throws Exception {
94
134
when (configuration .getString (eq (Constants .SINK_KAFKA_BROKERS_KEY ), anyString ())).thenReturn ("10.200.216.87:6668" );
@@ -119,4 +159,12 @@ public void shouldReturnBigQuerySink() {
119
159
Sink sinkFunction = sinkOrchestrator .getSink (configuration , new String []{}, stencilClientOrchestrator , daggerStatsDReporter );
120
160
assertThat (sinkFunction , instanceOf (BigQuerySink .class ));
121
161
}
162
+
163
+ private File writeDummyPasswordFile (String fileName , String password ) throws IOException {
164
+ File passwordFile = temporaryFolder .newFile (fileName );
165
+ FileWriter writer = new FileWriter (passwordFile );
166
+ writer .write (password );
167
+ writer .close ();
168
+ return passwordFile ;
169
+ }
122
170
}
0 commit comments