Skip to content

Commit 37d19be

Browse files
Merge pull request #295 from splunk/gzip-compress
Added GZIP compression support
2 parents 0653ebf + 93f8330 commit 37d19be

File tree

11 files changed

+187
-24
lines changed

11 files changed

+187
-24
lines changed

.github/workflows/ci_build_test.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ jobs:
5757
kafka_package: "kafka_2.13-2.7.1.tgz"
5858
- kafka_version: "2.8.0"
5959
kafka_package: "kafka_2.13-2.8.0.tgz"
60+
- kafka_version: "3.0.0"
61+
kafka_package: "kafka_2.13-3.0.0.tgz"
6062
env:
6163
CI_SPLUNK_VERSION: "8.2.2"
6264
CI_SPLUNK_FILENAME: splunk-8.2.2-87344edfcdb4-Linux-x86_64.tgz

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ Use the below schema to configure Splunk Connect for Kafka
164164
| `splunk.hec.max.retries` | Amount of times a failed batch will attempt to resend before dropping events completely. Warning: This will result in data loss, default is `-1` which will retry indefinitely | `-1` |
165165
| `splunk.hec.backoff.threshhold.seconds` | The amount of time Splunk Connect for Kafka waits to attempt resending after errors from a HEC endpoint." | `60` |
166166
| `splunk.hec.lb.poll.interval` | Specify this parameter(in seconds) to control the polling interval(increase to do less polling, decrease to do more frequent polling) | `120` |
167+
| `splunk.hec.enable.compression` | Valid settings are true or false. Used for enable or disable gzip-compression. |`false`|
167168
### Acknowledgement Parameters
168169
#### Use Ack
169170
| Name | Description | Default Value |

src/main/java/com/splunk/hecclient/EventBatch.java

+42-10
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,14 @@
1717

1818
import org.apache.http.HttpEntity;
1919
import org.apache.http.entity.AbstractHttpEntity;
20+
import org.apache.http.entity.ContentProducer;
21+
import org.apache.http.entity.EntityTemplate;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

23-
import java.io.IOException;
24-
import java.io.InputStream;
25-
import java.io.OutputStream;
26-
import java.io.SequenceInputStream;
27-
28-
import java.util.ArrayList;
29-
import java.util.Enumeration;
30-
import java.util.List;
31-
import java.util.Map;
32-
import java.util.UUID;
25+
import java.io.*;
26+
import java.util.*;
27+
import java.util.zip.GZIPOutputStream;
3328

3429
public abstract class EventBatch {
3530
private static Logger log = LoggerFactory.getLogger(EventBatch.class);
@@ -42,6 +37,7 @@ public abstract class EventBatch {
4237

4338
private volatile int status = INIT;
4439
private int failureCount = 0;
40+
private boolean enableCompression;
4541
private long sendTimestamp = System.currentTimeMillis() / 1000; // in seconds
4642
protected int len;
4743
protected List<Event> events = new ArrayList<>();
@@ -129,6 +125,13 @@ public final HttpEntity getHttpEntity() {
129125
return e;
130126
}
131127

128+
public final HttpEntity getHttpEntityTemplate() {
129+
AbstractHttpEntity e = new EntityTemplate(new GzipDataContentProducer());
130+
e.setContentEncoding("gzip");
131+
e.setContentType(getContentType());
132+
return e;
133+
}
134+
132135
@Override
133136
public final String toString() {
134137
StringBuilder builder = new StringBuilder();
@@ -141,6 +144,35 @@ public final String toString() {
141144
return builder.toString();
142145
}
143146

147+
public boolean isEnableCompression() {
148+
return enableCompression;
149+
}
150+
151+
public void setEnableCompression(boolean enableCompression) {
152+
this.enableCompression = enableCompression;
153+
}
154+
155+
public final byte[] getDataOfBatch() throws IOException {
156+
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
157+
for (final Event e : events) {
158+
e.writeTo(bos);
159+
}
160+
byte[] unCompressBytes = bos.toByteArray();
161+
return unCompressBytes;
162+
}
163+
}
164+
165+
private class GzipDataContentProducer implements ContentProducer {
166+
167+
@Override
168+
public void writeTo(OutputStream outputStream) throws IOException {
169+
OutputStream out = new GZIPOutputStream(outputStream);
170+
out.write(getDataOfBatch());
171+
out.flush();
172+
out.close();
173+
}
174+
}
175+
144176
private class HttpEventBatchEntity extends AbstractHttpEntity {
145177
@Override
146178
public boolean isRepeatable() {

src/main/java/com/splunk/hecclient/Indexer.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,20 @@
1515
*/
1616
package com.splunk.hecclient;
1717

18+
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.fasterxml.jackson.databind.node.ObjectNode;
1821
import com.splunk.kafka.connect.VersionUtils;
1922
import org.apache.http.Header;
2023
import org.apache.http.HttpEntity;
2124
import org.apache.http.client.methods.CloseableHttpResponse;
25+
import org.apache.http.client.methods.HttpPost;
2226
import org.apache.http.client.methods.HttpUriRequest;
2327
import org.apache.http.client.protocol.HttpClientContext;
2428
import org.apache.http.impl.client.CloseableHttpClient;
25-
import org.apache.http.client.methods.HttpPost;
2629
import org.apache.http.message.BasicHeader;
2730
import org.apache.http.protocol.HttpContext;
2831
import org.apache.http.util.EntityUtils;
29-
import com.fasterxml.jackson.databind.ObjectMapper;
30-
import com.fasterxml.jackson.databind.node.ObjectNode;
31-
import com.fasterxml.jackson.databind.JsonNode;
32-
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534

@@ -121,8 +120,12 @@ public boolean send(final EventBatch batch) {
121120
String url = baseUrl + endpoint;
122121
final HttpPost httpPost = new HttpPost(url);
123122
httpPost.setHeaders(headers);
124-
httpPost.setEntity(batch.getHttpEntity());
125-
123+
if (batch.isEnableCompression()) {
124+
httpPost.setHeader("Content-Encoding", "gzip");
125+
httpPost.setEntity(batch.getHttpEntityTemplate());
126+
} else {
127+
httpPost.setEntity(batch.getHttpEntity());
128+
}
126129
String resp;
127130
try {
128131
resp = executeHttpRequest(httpPost);

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
package com.splunk.kafka.connect;
1717

1818
import com.splunk.hecclient.HecConfig;
19-
import org.apache.kafka.common.config.ConfigException;
20-
import org.apache.kafka.connect.sink.SinkConnector;
19+
import org.apache.commons.lang3.StringUtils;
2120
import org.apache.kafka.common.config.AbstractConfig;
2221
import org.apache.kafka.common.config.ConfigDef;
23-
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.kafka.common.config.ConfigException;
23+
import org.apache.kafka.connect.sink.SinkConnector;
2424
import org.apache.kafka.connect.sink.SinkTask;
2525

26-
import java.util.*;
26+
import java.util.Arrays;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
2730

2831
public final class SplunkSinkConnectorConfig extends AbstractConfig {
2932
// General
@@ -45,6 +48,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
4548
static final String HEC_THREDS_CONF = "splunk.hec.threads";
4649
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
4750
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
51+
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
4852
// Acknowledgement Parameters
4953
// Use Ack
5054
static final String ACK_CONF = "splunk.hec.ack.enabled";
@@ -108,6 +112,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
108112
+ "Socket timeout.By default, this is set to 60 seconds.";
109113
static final String SSL_VALIDATE_CERTIFICATES_DOC = "Valid settings are true or false. Enables or disables HTTPS "
110114
+ "certification validation. By default, this is set to true.";
115+
static final String ENABLE_COMPRESSSION_DOC = "Valid settings are true or false. Used for enable or disable gzip-compression. By default, this is set to false.";
111116
// Acknowledgement Parameters
112117
// Use Ack
113118
static final String ACK_DOC = "Valid settings are true or false. When set to true Splunk Connect for Kafka will "
@@ -189,6 +194,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
189194
final int numberOfThreads;
190195
final int socketTimeout;
191196
final boolean validateCertificates;
197+
final boolean enableCompression;
192198
final int lbPollInterval;
193199

194200
final boolean ack;
@@ -259,6 +265,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
259265
headerSource = getString(HEADER_SOURCE_CONF);
260266
headerSourcetype = getString(HEADER_SOURCETYPE_CONF);
261267
headerHost = getString(HEADER_HOST_CONF);
268+
enableCompression = getBoolean(ENABLE_COMPRESSSION_CONF);
262269
}
263270

264271
public static ConfigDef conf() {
@@ -297,7 +304,8 @@ public static ConfigDef conf() {
297304
.define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC)
298305
.define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC)
299306
.define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC)
300-
.define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC);
307+
.define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC)
308+
.define(ENABLE_COMPRESSSION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ENABLE_COMPRESSSION_DOC);
301309
}
302310

303311
/**
@@ -362,6 +370,7 @@ public String toString() {
362370
+ "headerSource:" + headerSource + ", "
363371
+ "headerSourcetype:" + headerSourcetype + ", "
364372
+ "headerHost:" + headerHost + ", "
373+
+ "enableCompression:" + enableCompression + ", "
365374
+ "lbPollInterval:" + lbPollInterval;
366375
}
367376

src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java

+5
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,12 @@ private void handleRaw(final Collection<SinkRecord> records) {
165165
Map<TopicPartition, Collection<SinkRecord>> partitionedRecords = partitionRecords(records);
166166
for (Map.Entry<TopicPartition, Collection<SinkRecord>> entry: partitionedRecords.entrySet()) {
167167
EventBatch batch = createRawEventBatch(entry.getKey());
168+
batch.setEnableCompression(connectorConfig.enableCompression);
168169
sendEvents(entry.getValue(), batch);
169170
}
170171
} else {
171172
EventBatch batch = createRawEventBatch(null);
173+
batch.setEnableCompression(connectorConfig.enableCompression);
172174
sendEvents(records, batch);
173175
}
174176
}
@@ -193,6 +195,7 @@ private void handleRecordsWithHeader(final Collection<SinkRecord> records) {
193195
ArrayList<SinkRecord> recordArrayList = set.getValue();
194196

195197
EventBatch batch = createRawHeaderEventBatch(splunkSinkRecordKey);
198+
batch.setEnableCompression(connectorConfig.enableCompression);
196199
sendEvents(recordArrayList, batch);
197200
}
198201
log.debug("{} records have been bucketed in to {} batches", records.size(), recordsWithSameHeaders.size());
@@ -260,6 +263,7 @@ public String insertHeaderToken() {
260263

261264
private void handleEvent(final Collection<SinkRecord> records) {
262265
EventBatch batch = new JsonEventBatch();
266+
batch.setEnableCompression(connectorConfig.enableCompression);
263267
sendEvents(records, batch);
264268
}
265269

@@ -283,6 +287,7 @@ private void sendEvents(final Collection<SinkRecord> records, EventBatch batch)
283287
send(batch);
284288
// start a new batch after send
285289
batch = batch.createFromThis();
290+
batch.setEnableCompression(connectorConfig.enableCompression);
286291
}
287292
}
288293

src/test/java/com/splunk/hecclient/IndexerTest.java

+44
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,48 @@ private Indexer assertFailure(CloseableHttpClient client) {
194194
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
195195
return indexer;
196196
}
197+
198+
@Test
199+
public void sendCompressedBatchWithSuccess() {
200+
for (int i = 0; i < 2; i++) {
201+
CloseableHttpClientMock client = new CloseableHttpClientMock();
202+
if (i == 0) {
203+
client.setResponse(CloseableHttpClientMock.success);
204+
}
205+
PollerMock poller = new PollerMock();
206+
207+
Indexer indexer = new Indexer(baseUrl, token, client, poller);
208+
EventBatch batch = UnitUtil.createBatch();
209+
batch.setEnableCompression(true);
210+
boolean result = indexer.send(batch);
211+
Assert.assertTrue(result);
212+
Assert.assertNotNull(poller.getBatch());
213+
Assert.assertNull(poller.getFailedBatch());
214+
Assert.assertNull(poller.getException());
215+
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
216+
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
217+
}
218+
}
219+
220+
@Test
221+
public void sendCompressedRawBatchWithSuccess() {
222+
for (int i = 0; i < 2; i++) {
223+
CloseableHttpClientMock client = new CloseableHttpClientMock();
224+
if (i == 0) {
225+
client.setResponse(CloseableHttpClientMock.success);
226+
}
227+
PollerMock poller = new PollerMock();
228+
229+
Indexer indexer = new Indexer(baseUrl, token, client, poller);
230+
EventBatch batch = UnitUtil.createRawEventBatch();
231+
batch.setEnableCompression(true);
232+
boolean result = indexer.send(batch);
233+
Assert.assertTrue(result);
234+
Assert.assertNotNull(poller.getBatch());
235+
Assert.assertNull(poller.getFailedBatch());
236+
Assert.assertNull(poller.getException());
237+
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
238+
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
239+
}
240+
}
197241
}

src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
import org.junit.Assert;
2020
import org.junit.Test;
2121

22+
import java.io.ByteArrayInputStream;
2223
import java.io.ByteArrayOutputStream;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
28-
import java.util.concurrent.TimeUnit;
29+
import java.util.zip.GZIPInputStream;
2930

3031
public class JsonEvenBatchTest {
3132
@Test
@@ -177,4 +178,31 @@ private int readContent(final HttpEntity entity, byte[] data) {
177178

178179
return UnitUtil.read(in, data);
179180
}
181+
182+
@Test
183+
public void testGZIPCompressionForJsonEvent() {
184+
EventBatch batch = new JsonEventBatch();
185+
batch.setEnableCompression(true);
186+
Assert.assertTrue(batch.isEnableCompression());
187+
Event event = new JsonEvent("hello world! hello world! hello world!", "hao");
188+
batch.add(event);
189+
HttpEntity entity = batch.getHttpEntityTemplate();
190+
byte[] data = new byte[1024];
191+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
192+
entity.writeTo(out);
193+
String expected = "{\"event\":\"hello world! hello world! hello world!\"}\n";
194+
ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray());
195+
GZIPInputStream gis = new GZIPInputStream(bis);
196+
int read = gis.read(data, 0, data.length);
197+
gis.close();
198+
bis.close();
199+
200+
// Decode the bytes into a String
201+
String ori = new String(data, 0, read, "UTF-8");
202+
Assert.assertEquals(expected, ori);
203+
} catch (IOException ex) {
204+
Assert.assertTrue("failed to compress and decompress the data", false);
205+
throw new HecException("failed to compress and decompress the data", ex);
206+
}
207+
}
180208
}

src/test/java/com/splunk/hecclient/RawEventBatchTest.java

+32
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515
*/
1616
package com.splunk.hecclient;
1717

18+
import org.apache.http.HttpEntity;
1819
import org.junit.Assert;
1920
import org.junit.Test;
2021

22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.IOException;
2125
import java.util.List;
26+
import java.util.zip.GZIPInputStream;
2227

2328
public class RawEventBatchTest {
2429
@Test
@@ -122,4 +127,31 @@ public void checkEquals() {
122127

123128
Assert.assertFalse(batchOne.equals(batchTwo));
124129
}
130+
131+
@Test
132+
public void testGZIPCompressionForRaw() {
133+
EventBatch batch = RawEventBatch.factory().build();
134+
batch.setEnableCompression(true);
135+
Assert.assertTrue(batch.isEnableCompression());
136+
Event event = new RawEvent("hello world! hello world! hello world!", null);
137+
batch.add(event);
138+
HttpEntity entity = batch.getHttpEntityTemplate();
139+
byte[] data = new byte[1024];
140+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
141+
entity.writeTo(out);
142+
String expected = "hello world! hello world! hello world!";
143+
ByteArrayInputStream bis = new ByteArrayInputStream(out.toByteArray());
144+
GZIPInputStream gis = new GZIPInputStream(bis);
145+
int read = gis.read(data, 0, data.length);
146+
gis.close();
147+
bis.close();
148+
149+
// Decode the bytes into a String
150+
String ori = new String(data, 0, read, "UTF-8");
151+
Assert.assertEquals(expected, ori);
152+
} catch (IOException ex) {
153+
Assert.assertTrue("failed to compress and decompress the data", false);
154+
throw new HecException("failed to compress and decompress the data", ex);
155+
}
156+
}
125157
}

0 commit comments

Comments
 (0)