Skip to content

Commit 8eab900

Browse files
feat: add http sink builder
1 parent 5fabca4 commit 8eab900

File tree

1 file changed

+241
-0
lines changed

1 file changed

+241
-0
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package com.gotocompany.dagger.core.sink.http;
2+
3+
import com.gotocompany.dagger.common.configuration.Configuration;
4+
import com.gotocompany.dagger.common.core.StencilClientOrchestrator;
5+
import com.gotocompany.dagger.common.serde.proto.serialization.ProtoSerializer;
6+
import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
7+
import com.gotocompany.dagger.core.utils.Constants;
8+
import org.apache.flink.api.java.utils.ParameterTool;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.logging.Logger;
13+
import java.net.URL;
14+
import java.net.MalformedURLException;
15+
16+
public class HttpSinkBuilder {
17+
private static final Logger LOGGER = Logger.getLogger(HttpSinkBuilder.class.getName());
18+
19+
private String[] columnNames;
20+
private StencilClientOrchestrator stencilClientOrchestrator;
21+
private Configuration configuration;
22+
private DaggerStatsDReporter daggerStatsDReporter;
23+
private String endpoint;
24+
private int maxRetries;
25+
private long retryBackoffMs;
26+
private int connectTimeoutMs;
27+
private int readTimeoutMs;
28+
private String authType;
29+
private String authToken;
30+
private boolean compressionEnabled;
31+
private String compressionType;
32+
private int batchSize;
33+
private long flushIntervalMs;
34+
private boolean asyncEnabled;
35+
private int maxConnections;
36+
private boolean validateSslCertificate;
37+
private String proxyHost;
38+
private int proxyPort;
39+
40+
private HttpSinkBuilder() {
41+
this.maxRetries = 3;
42+
this.retryBackoffMs = 1000;
43+
this.connectTimeoutMs = 5000;
44+
this.readTimeoutMs = 30000;
45+
this.authType = "None";
46+
this.compressionEnabled = false;
47+
this.compressionType = "gzip";
48+
this.batchSize = 100;
49+
this.flushIntervalMs = 1000;
50+
this.asyncEnabled = true;
51+
this.maxConnections = 10;
52+
this.validateSslCertificate = true;
53+
}
54+
55+
public static HttpSinkBuilder create() {
56+
return new HttpSinkBuilder();
57+
}
58+
59+
public HttpSink build() {
60+
validateConfiguration();
61+
ProtoSerializer protoSerializer = createProtoSerializer();
62+
Configuration conf = setDefaultValues(configuration);
63+
return new HttpSink(conf, protoSerializer, daggerStatsDReporter);
64+
}
65+
66+
private ProtoSerializer createProtoSerializer() {
67+
return new ProtoSerializer(
68+
configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS", ""),
69+
configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", ""),
70+
columnNames,
71+
stencilClientOrchestrator);
72+
}
73+
74+
private Configuration setDefaultValues(Configuration inputConf) {
75+
Map<String, String> configMap = new HashMap<>(inputConf.getParam().toMap());
76+
configMap.put("SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH", "false");
77+
configMap.put("SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS", "86400000");
78+
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES", "4");
79+
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS", "5000");
80+
configMap.put("SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY", "LONG_POLLING");
81+
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS", "60000");
82+
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS", "");
83+
configMap.put("SINK_METRICS_APPLICATION_PREFIX", "dagger_");
84+
configMap.put(Constants.SINK_HTTP_ENDPOINT_KEY, endpoint);
85+
configMap.put(Constants.SINK_HTTP_MAX_RETRIES, String.valueOf(maxRetries));
86+
configMap.put(Constants.SINK_HTTP_RETRY_BACKOFF_MS, String.valueOf(retryBackoffMs));
87+
configMap.put(Constants.SINK_HTTP_CONNECT_TIMEOUT_MS, String.valueOf(connectTimeoutMs));
88+
configMap.put(Constants.SINK_HTTP_READ_TIMEOUT_MS, String.valueOf(readTimeoutMs));
89+
configMap.put(Constants.SINK_HTTP_AUTH_TYPE, authType);
90+
configMap.put(Constants.SINK_HTTP_AUTH_TOKEN, authToken);
91+
configMap.put(Constants.SINK_HTTP_COMPRESSION_ENABLED, String.valueOf(compressionEnabled));
92+
configMap.put(Constants.SINK_HTTP_COMPRESSION_TYPE, compressionType);
93+
configMap.put(Constants.SINK_HTTP_BATCH_SIZE, String.valueOf(batchSize));
94+
configMap.put(Constants.SINK_HTTP_FLUSH_INTERVAL_MS, String.valueOf(flushIntervalMs));
95+
configMap.put(Constants.SINK_HTTP_ASYNC_ENABLED, String.valueOf(asyncEnabled));
96+
configMap.put(Constants.SINK_HTTP_MAX_CONNECTIONS, String.valueOf(maxConnections));
97+
configMap.put(Constants.SINK_HTTP_VALIDATE_SSL_CERTIFICATE, String.valueOf(validateSslCertificate));
98+
configMap.put(Constants.SINK_HTTP_PROXY_HOST, proxyHost);
99+
configMap.put(Constants.SINK_HTTP_PROXY_PORT, String.valueOf(proxyPort));
100+
return new Configuration(ParameterTool.fromMap(configMap));
101+
}
102+
103+
private void validateConfiguration() {
104+
if (endpoint == null || endpoint.isEmpty()) {
105+
throw new IllegalArgumentException("HTTP endpoint must be set");
106+
}
107+
validateEndpointUrl(endpoint);
108+
if (batchSize <= 0) {
109+
throw new IllegalArgumentException("Batch size must be greater than 0");
110+
}
111+
if (flushIntervalMs <= 0) {
112+
throw new IllegalArgumentException("Flush interval must be greater than 0");
113+
}
114+
if (maxRetries < 0) {
115+
throw new IllegalArgumentException("Max retries must be non-negative");
116+
}
117+
if (retryBackoffMs < 0) {
118+
throw new IllegalArgumentException("Retry backoff must be non-negative");
119+
}
120+
if (connectTimeoutMs <= 0) {
121+
throw new IllegalArgumentException("Connect timeout must be greater than 0");
122+
}
123+
if (readTimeoutMs <= 0) {
124+
throw new IllegalArgumentException("Read timeout must be greater than 0");
125+
}
126+
if (maxConnections <= 0) {
127+
throw new IllegalArgumentException("Max connections must be greater than 0");
128+
}
129+
if (proxyPort < 0 || proxyPort > 65535) {
130+
throw new IllegalArgumentException("Proxy port must be between 0 and 65535");
131+
}
132+
}
133+
134+
private void validateEndpointUrl(String url) {
135+
try {
136+
new URL(url);
137+
} catch (MalformedURLException e) {
138+
throw new IllegalArgumentException("Invalid endpoint URL: " + url, e);
139+
}
140+
}
141+
142+
public HttpSinkBuilder setConfiguration(Configuration configuration) {
143+
this.configuration = configuration;
144+
return this;
145+
}
146+
147+
public HttpSinkBuilder setColumnNames(String[] columnNames) {
148+
this.columnNames = columnNames;
149+
return this;
150+
}
151+
152+
public HttpSinkBuilder setStencilClientOrchestrator(StencilClientOrchestrator stencilClientOrchestrator) {
153+
this.stencilClientOrchestrator = stencilClientOrchestrator;
154+
return this;
155+
}
156+
157+
public HttpSinkBuilder setDaggerStatsDReporter(DaggerStatsDReporter daggerStatsDReporter) {
158+
this.daggerStatsDReporter = daggerStatsDReporter;
159+
return this;
160+
}
161+
162+
public HttpSinkBuilder setEndpoint(String endpoint) {
163+
this.endpoint = endpoint;
164+
return this;
165+
}
166+
167+
public HttpSinkBuilder setMaxRetries(int maxRetries) {
168+
this.maxRetries = maxRetries;
169+
return this;
170+
}
171+
172+
public HttpSinkBuilder setRetryBackoffMs(long retryBackoffMs) {
173+
this.retryBackoffMs = retryBackoffMs;
174+
return this;
175+
}
176+
177+
public HttpSinkBuilder setConnectTimeoutMs(int connectTimeoutMs) {
178+
this.connectTimeoutMs = connectTimeoutMs;
179+
return this;
180+
}
181+
182+
public HttpSinkBuilder setReadTimeoutMs(int readTimeoutMs) {
183+
this.readTimeoutMs = readTimeoutMs;
184+
return this;
185+
}
186+
187+
public HttpSinkBuilder setAuthType(String authType) {
188+
this.authType = authType;
189+
return this;
190+
}
191+
192+
public HttpSinkBuilder setAuthToken(String authToken) {
193+
this.authToken = authToken;
194+
return this;
195+
}
196+
197+
public HttpSinkBuilder setCompressionEnabled(boolean compressionEnabled) {
198+
this.compressionEnabled = compressionEnabled;
199+
return this;
200+
}
201+
202+
public HttpSinkBuilder setCompressionType(String compressionType) {
203+
this.compressionType = compressionType;
204+
return this;
205+
}
206+
207+
public HttpSinkBuilder setBatchSize(int batchSize) {
208+
this.batchSize = batchSize;
209+
return this;
210+
}
211+
212+
public HttpSinkBuilder setFlushIntervalMs(long flushIntervalMs) {
213+
this.flushIntervalMs = flushIntervalMs;
214+
return this;
215+
}
216+
217+
public HttpSinkBuilder setAsyncEnabled(boolean asyncEnabled) {
218+
this.asyncEnabled = asyncEnabled;
219+
return this;
220+
}
221+
222+
public HttpSinkBuilder setMaxConnections(int maxConnections) {
223+
this.maxConnections = maxConnections;
224+
return this;
225+
}
226+
227+
public HttpSinkBuilder setValidateSslCertificate(boolean validateSslCertificate) {
228+
this.validateSslCertificate = validateSslCertificate;
229+
return this;
230+
}
231+
232+
public HttpSinkBuilder setProxyHost(String proxyHost) {
233+
this.proxyHost = proxyHost;
234+
return this;
235+
}
236+
237+
public HttpSinkBuilder setProxyPort(int proxyPort) {
238+
this.proxyPort = proxyPort;
239+
return this;
240+
}
241+
}

0 commit comments

Comments
 (0)