Skip to content

Commit 640c44a

Browse files
tests: add tests for HttpSink
1 parent 7d16a69 commit 640c44a

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.gotocompany.dagger.core.sink.http;
2+
3+
import com.gotocompany.dagger.common.configuration.Configuration;
4+
import com.gotocompany.dagger.common.serde.proto.serialization.ProtoSerializer;
5+
import com.gotocompany.dagger.core.metrics.reporters.ErrorReporter;
6+
import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
7+
import com.gotocompany.depot.Sink;
8+
import com.gotocompany.depot.http.HttpSinkFactory;
9+
import org.apache.flink.api.connector.sink.SinkWriter;
10+
import org.apache.flink.configuration.ConfigOptions;
11+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
12+
import org.junit.Before;
13+
import org.junit.Test;
14+
import org.mockito.Mock;
15+
import org.mockito.MockitoAnnotations;
16+
17+
import java.util.Collections;
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
import static org.junit.Assert.*;
22+
import static org.mockito.Mockito.*;
23+
24+
public class HttpSinkTest {
25+
26+
@Mock
27+
private Configuration configuration;
28+
@Mock
29+
private ProtoSerializer protoSerializer;
30+
@Mock
31+
private DaggerStatsDReporter daggerStatsDReporter;
32+
@Mock
33+
private HttpSinkFactory httpSinkFactory;
34+
@Mock
35+
private Sink depotSink;
36+
@Mock
37+
private SinkWriterMetricGroup metricGroup;
38+
39+
private HttpSink httpSink;
40+
41+
@Before
42+
public void setUp() {
43+
MockitoAnnotations.initMocks(this);
44+
httpSink = new HttpSink(configuration, protoSerializer, httpSinkFactory, daggerStatsDReporter);
45+
}
46+
47+
@Test
48+
public void testCreateWriter() throws Exception {
49+
Map<String, String> configMap = new HashMap<>();
50+
configMap.put("SINK_HTTP_BATCH_SIZE", "50");
51+
configMap.put("SINK_ERROR_TYPES_FOR_FAILURE", "DESERIALIZATION,HTTP_ERROR");
52+
when(configuration.getParam()).thenReturn(ConfigOptions.fromMap(configMap));
53+
when(httpSinkFactory.create()).thenReturn(depotSink);
54+
55+
SinkWriter<?, ?, ?> writer = httpSink.createWriter(new TestInitContext(), Collections.emptyList());
56+
57+
assertNotNull(writer);
58+
assertTrue(writer instanceof HttpSinkWriter);
59+
verify(httpSinkFactory).init();
60+
verify(httpSinkFactory).create();
61+
}
62+
63+
@Test
64+
public void testGetWriterStateSerializer() {
65+
assertFalse(httpSink.getWriterStateSerializer().isPresent());
66+
}
67+
68+
@Test
69+
public void testCreateCommitter() throws Exception {
70+
assertFalse(httpSink.createCommitter().isPresent());
71+
}
72+
73+
@Test
74+
public void testCreateGlobalCommitter() throws Exception {
75+
assertFalse(httpSink.createGlobalCommitter().isPresent());
76+
}
77+
78+
@Test
79+
public void testGetCommittableSerializer() {
80+
assertFalse(httpSink.getCommittableSerializer().isPresent());
81+
}
82+
83+
@Test
84+
public void testGetGlobalCommittableSerializer() {
85+
assertFalse(httpSink.getGlobalCommittableSerializer().isPresent());
86+
}
87+
88+
private class TestInitContext implements HttpSink.InitContext {
89+
@Override
90+
public SinkWriterMetricGroup metricGroup() {
91+
return metricGroup;
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)