From 40f778cf31a9e57f02c6b15565b9c488c4a87b94 Mon Sep 17 00:00:00 2001
From: Raul Agepati <35212250+ragepati@users.noreply.github.com>
Date: Wed, 15 Dec 2021 08:23:54 +0530
Subject: [PATCH 01/17] CCLOG-1308: Bump connector version to address log4j CVE
(#8) (#9)
* CCLOG-1210: Enable Jenkins
* CCLOG-1210: Validations for Splunk access (#5)
* CCLOG-1226: Remove unnecessary logs (#6)
* CCLOG-1274: Avoid logging config, data by default (#7)
* CCLOG-1308: Bump connector version to address log4j CVE
---
Jenkinsfile | 9 +
pom.xml | 46 +++--
.../com/splunk/hecclient/HecAckPoller.java | 4 +-
.../java/com/splunk/hecclient/Indexer.java | 2 +-
.../com/splunk/hecclient/ResponsePoller.java | 4 +-
.../kafka/connect/KafkaRecordTracker.java | 2 +-
.../kafka/connect/SplunkSinkConnector.java | 117 ++++++++++++-
.../splunk/kafka/connect/SplunkSinkTask.java | 5 +-
src/main/resources/version.properties | 4 +-
.../connect/SplunkSinkConnecterTest.java | 158 +++++++++++++++++-
10 files changed, 317 insertions(+), 34 deletions(-)
create mode 100644 Jenkinsfile
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 00000000..eea6de55
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,9 @@
+#!/usr/bin/env groovy
+/*
+ * Copyright [2021 - 2021] Confluent Inc.
+ */
+common {
+ slackChannel = '#connect-warn'
+ nodeLabel = 'docker-debian-jdk8'
+ downStreamValidate = false
+}
diff --git a/pom.xml b/pom.xml
index 064c78a4..277ae8d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,17 +3,21 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
+
+ io.confluent
+ connect-plugins-parent
+ 0.6.8
+
com.github.splunk.kafka.connect
splunk-kafka-connect
- v2.0.5
+ v2.0.5.1
splunk-kafka-connect
UTF-8
1.8
1.8
- 1.8
4.13.2
5.3.2
5.3.2
@@ -152,6 +156,12 @@
3.7
compile
+
+ org.mockito
+ mockito-core
+ 2.23.4
+ test
+
@@ -202,19 +212,25 @@
org.apache.maven.plugins
maven-checkstyle-plugin
- 2.17
-
-
- validate
- validate
-
- google_checks.xml
-
-
- check
-
-
-
+
+ true
+
+
+
+ com.github.spotbugs
+ spotbugs-maven-plugin
+
+ true
+
+
+
+ com.mycila
+ license-maven-plugin
+
+
+ **
+
+
diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java
index a0931bc3..2ee3f527 100644
--- a/src/main/java/com/splunk/hecclient/HecAckPoller.java
+++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java
@@ -132,7 +132,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
}
if (resp.getText() == "Invalid data format") {
- log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString());
+ log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} batch={}", channel, channel.getIndexer(), batch.getUUID());
batch.commit();
List committedBatches = new ArrayList<>();
committedBatches.add(batch);
@@ -316,7 +316,7 @@ private void findAndRemoveTimedoutBatches(Map batches, List batches) {
- log.debug("received acked event batches={}", batches);
+ log.debug("received acked event batches={}", batches.size());
/* Loop all *assigned* partitions to find the lowest consecutive
* HEC-commited offsets. A batch could contain events coming from a
* variety of topic/partitions, and scanning those events coulb be
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
index 70865868..bae92dab 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
@@ -15,19 +15,30 @@
*/
package com.splunk.kafka.connect;
-import com.splunk.kafka.connect.VersionUtils;
+import com.splunk.hecclient.Hec;
+import com.splunk.hecclient.HecConfig;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class SplunkSinkConnector extends SinkConnector {
+public class SplunkSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
private Map taskConfig;
@@ -66,4 +77,100 @@ public String version() {
public ConfigDef config() {
return SplunkSinkConnectorConfig.conf();
}
+
+ @Override
+ public Config validate(Map connectorConfigs) {
+ Config config = super.validate(connectorConfigs);
+ SplunkSinkConnectorConfig connectorConfig;
+ try {
+ connectorConfig = new SplunkSinkConnectorConfig(connectorConfigs);
+ } catch (Exception e) {
+ log.warn("Validating configuration caught an exception", e);
+ return config;
+ }
+
+ Map configValues =
+ config.configValues()
+ .stream()
+ .collect(Collectors.toMap(
+ ConfigValue::name,
+ Function.identity()));
+
+ validateAccess(connectorConfig, configValues);
+
+ return config;
+ }
+
+ /**
+ * We validate access by posting an empty payload to the Splunk endpoint.
+ *
+ * For a valid endpoint and a valid token, this returns a HTTP 400 Bad Request with the
+ * payload: {"text":"No data","code":5}
+ * For a valid endpoint and an invalid token, this returns a HTTP 403 Forbidden with the
+ * payload: {"text":"Invalid token","code":4}
+ * For an invalid hostname and other errors, the Java UnknownHostException and other similar
+ * Exceptions are thrown.
+ *
+ * @param connectorConfig The connector configuration
+ * @param configValues The configuration ConfigValues
+ */
+ private void validateAccess(SplunkSinkConnectorConfig connectorConfig, Map configValues) {
+ HecConfig hecConfig = connectorConfig.getHecConfig();
+
+ try (CloseableHttpClient httpClient = createHttpClient(hecConfig)) {
+ List uris = hecConfig.getUris();
+
+ Map validationFailedIndexers = new LinkedHashMap<>();
+
+ for (String uri : uris) {
+ log.trace("Validating " + uri);
+ HttpPost request = new HttpPost(uri + "/services/collector");
+ request.setEntity(new StringEntity(""));
+
+ request.addHeader(HttpHeaders.AUTHORIZATION, String.format("Splunk %s", hecConfig.getToken()));
+
+ int status = -1;
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ status = response.getStatusLine().getStatusCode();
+ if (status == 400) {
+ log.trace("Validation succeeded for indexer {}", uri);
+ } else if (status == 403) {
+ log.trace("Invalid HEC token for indexer {}", uri);
+ validationFailedIndexers.put(uri, response.getStatusLine().toString());
+ } else {
+ log.trace("Validation failed for {}", uri);
+ validationFailedIndexers.put(uri, response.getStatusLine().toString());
+ }
+ } catch (Exception e) {
+ log.error("Caught exception while validating", e);
+ validationFailedIndexers.put(uri, e.getMessage());
+ }
+ }
+
+ if (!validationFailedIndexers.isEmpty()) {
+ log.trace("Validation failed: " + validationFailedIndexers);
+ recordErrors(configValues,
+ "Validation Failed: " + validationFailedIndexers,
+ SplunkSinkConnectorConfig.URI_CONF,
+ SplunkSinkConnectorConfig.TOKEN_CONF);
+ }
+ } catch (IOException e) {
+ log.error("Configuration validation error", e);
+ recordErrors(configValues,
+ "Configuration validation error: " + e.getMessage(),
+ SplunkSinkConnectorConfig.URI_CONF,
+ SplunkSinkConnectorConfig.TOKEN_CONF);
+ }
+ }
+
+ // Enables mocking during testing
+ CloseableHttpClient createHttpClient(final HecConfig config) {
+ return Hec.createHttpClient(config);
+ }
+
+ void recordErrors(Map configValues, String message, String...keys) {
+ for (String key : keys) {
+ configValues.get(key).addErrorMessage(message);
+ }
+ }
}
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
index 82307fb5..2e2af396 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
@@ -65,8 +65,6 @@ public void start(Map taskConfig) {
if(connectorConfig.flushWindow > 0) {
flushWindow = connectorConfig.flushWindow * 1000; // Flush window set to user configured value (Multiply by 1000 as all the calculations are done in milliseconds)
}
-
- log.info("kafka-connect-splunk task starts with config={}", connectorConfig);
}
@Override
@@ -366,7 +364,6 @@ public void stop() {
if (hec != null) {
hec.close();
}
- log.info("kafka-connect-splunk task ends with config={}", connectorConfig);
}
@Override
@@ -404,7 +401,7 @@ private Event createHecEventFrom(final SinkRecord record) {
event.setTied(record);
event.addFields(connectorConfig.enrichments);
} catch(Exception e) {
- log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString());
+ log.trace("event does not follow correct HEC pre-formatted format: {}", record.value().toString());
event = createHECEventNonFormatted(record);
}
} else {
diff --git a/src/main/resources/version.properties b/src/main/resources/version.properties
index bff7da4f..c300f0f3 100644
--- a/src/main/resources/version.properties
+++ b/src/main/resources/version.properties
@@ -1,3 +1,3 @@
githash=
-gitbranch=release/2.0.x
-gitversion=v2.0.5
+gitbranch=2.0.5.x
+gitversion=v2.0.5.1
diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
index 666f3b37..5ba2d16e 100644
--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
+++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
@@ -15,17 +15,82 @@
*/
package com.splunk.kafka.connect;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
-import java.util.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
class SplunkSinkConnecterTest {
+ private String SPLUNK_URI_HOST1 = "https://www.host1.com:1111/";
+
+ @Mock
+ CloseableHttpResponse validHttpResponse;
+
+ @Mock
+ CloseableHttpResponse unauthorizedHttpResponse;
+
+ @Mock
+ CloseableHttpResponse notFoundHttpResponse;
+
+ @Spy
+ SplunkSinkConnector connector;
+
+ @Spy
+ CloseableHttpClient httpClient;
+
+ @BeforeEach
+ void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(httpClient)
+ .when(connector)
+ .createHttpClient(anyObject());
+
+ when(validHttpResponse.getStatusLine()).
+ thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_BAD_REQUEST, "Bad Request"));
+ when(validHttpResponse.getEntity())
+ .thenReturn(new StringEntity("{\"text\":\"No data\",\"code\":5}", ContentType.APPLICATION_JSON));
+
+ when(unauthorizedHttpResponse.getStatusLine())
+ .thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_FORBIDDEN, "Unauthorized"));
+ when(unauthorizedHttpResponse.getEntity())
+ .thenReturn(new StringEntity("{\"text\":\"Invalid token\",\"code\":4}", ContentType.APPLICATION_JSON));
+
+ when(notFoundHttpResponse.getStatusLine()).
+ thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_NOT_FOUND, "Not Found"));
+ when(notFoundHttpResponse.getEntity())
+ .thenReturn(new StringEntity("Not Found", ContentType.APPLICATION_XHTML_XML));
+ }
+
@Test
void startStop() {
SinkConnector connector = new SplunkSinkConnector();
@@ -55,4 +120,93 @@ public void config() {
ConfigDef config = connector.config();
Assert.assertNotNull(config);
}
-}
\ No newline at end of file
+
+ @Test
+ public void testValidationEmptyConfig() {
+ Config config = new SplunkSinkConnector().validate(new HashMap<>());
+
+ Map> errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+ }
+
+ @Test
+ public void testValidationSuccess() throws IOException {
+ Map connectorConfig = getConnectorConfig();
+
+ doReturn(validHttpResponse)
+ .when(httpClient)
+ .execute(any());
+
+ Config config = connector.validate(connectorConfig);
+ for (ConfigValue value : config.configValues()) {
+ assertTrue(value.errorMessages().isEmpty());
+ }
+ }
+
+ @Test
+ public void testValidationFailure() throws IOException {
+ Map connectorConfig = getConnectorConfig();
+
+ doReturn(unauthorizedHttpResponse)
+ .when(httpClient)
+ .execute(any());
+
+ Config config = connector.validate(connectorConfig);
+
+ Map> errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(SPLUNK_URI_HOST1));
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1));
+ }
+
+ @Test
+ public void testMultipleValidationFailure() throws IOException {
+ Map connectorConfig = getConnectorConfig();
+ String host2 = "https://www.host2.com:2222/";
+ String host3 = "https://www.host3.com:3333/";
+ connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF,
+ SPLUNK_URI_HOST1 + "," + host2 + "," + host3);
+
+ doReturn(unauthorizedHttpResponse)
+ .doThrow(new UnknownHostException("Host not found"))
+ .doReturn(notFoundHttpResponse)
+ .when(httpClient)
+ .execute(any());
+
+ Config config = connector.validate(connectorConfig);
+
+ Map> errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+
+
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(SPLUNK_URI_HOST1));
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host2));
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host3));
+
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1));
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(host2));
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(host3));
+ }
+
+ private Map getConnectorConfig() {
+ Map connectorConfig = new HashMap<>();
+
+ connectorConfig.put(SinkConnector.TOPICS_CONFIG, "topic1");
+ connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF, SPLUNK_URI_HOST1);
+ connectorConfig.put(SplunkSinkConnectorConfig.TOKEN_CONF, "token1");
+ connectorConfig.put(SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF, "false");
+
+ return connectorConfig;
+ }
+
+}
From 29245609913329db9b435e82a62150fc994a7fd1 Mon Sep 17 00:00:00 2001
From: Raul Agepati <35212250+ragepati@users.noreply.github.com>
Date: Tue, 21 Dec 2021 12:39:07 +0530
Subject: [PATCH 02/17] CCLOG-1248: Validate Splunk host and token separately
(#10)
---
.../kafka/connect/SplunkSinkConnector.java | 160 +++++++++++++-----
.../connect/SplunkSinkConnecterTest.java | 95 ++++++++---
2 files changed, 190 insertions(+), 65 deletions(-)
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
index bae92dab..aa82e9b9 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
@@ -19,6 +19,7 @@
import com.splunk.hecclient.HecConfig;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -31,6 +32,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -41,6 +43,8 @@
public class SplunkSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
private Map taskConfig;
+ final private String COLLECTOR_ENDPOINT = "/services/collector";
+ final private String HEALTH_CHECK_ENDPOINT = COLLECTOR_ENDPOINT + "/health";
@Override
public void start(Map taskConfig) {
@@ -96,11 +100,81 @@ public Config validate(Map connectorConfigs) {
ConfigValue::name,
Function.identity()));
- validateAccess(connectorConfig, configValues);
+ HecConfig hecConfig = connectorConfig.getHecConfig();
+ try (CloseableHttpClient httpClient = createHttpClient(hecConfig)) {
+
+ if (!validateCollector(httpClient, hecConfig, configValues)) {
+ return config;
+ }
+
+ validateAccess(httpClient, hecConfig, configValues);
+
+ } catch (IOException e) {
+ log.error("Configuration validation error", e);
+ recordErrors(
+ configValues,
+ "Configuration validation error: " + e.getMessage(),
+ SplunkSinkConnectorConfig.TOKEN_CONF
+ );
+ }
return config;
}
+ /**
+ * We validate the collector by querying the HEC collector health.
+ *
+ * For a valid collector endpoint, this returns a HTTP 200 OK with the payload:
+ * {"text":"HEC is healthy","code":17}
+ * For an invalid hostname and other errors, the Java UnknownHostException and other similar
+ * Exceptions are thrown.
+ *
+ * @param httpClient HEC HTTP Client
+ * @param hecConfig The HEC configuration
+ * @param configValues The configuration ConfigValues
+ * @return Whether the validation was successful
+ */
+ private boolean validateCollector(
+ CloseableHttpClient httpClient,
+ HecConfig hecConfig,
+ Map configValues
+ ) {
+ List uris = hecConfig.getUris();
+
+ Map connectionFailedCollectors = new LinkedHashMap<>();
+
+ for (String uri : uris) {
+ log.trace("Connecting to " + uri);
+ HttpGet request = new HttpGet(uri + HEALTH_CHECK_ENDPOINT);
+
+ int status = -1;
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ status = response.getStatusLine().getStatusCode();
+ if (status == 200) {
+ log.trace("Connection succeeded for collector {}", uri);
+ } else {
+ log.trace("Connection failed for collector {}", uri);
+ connectionFailedCollectors.put(uri, response.getStatusLine().toString());
+ }
+ } catch (Exception e) {
+ log.error("Caught exception while connecting", e);
+ connectionFailedCollectors.put(uri, e.getMessage());
+ }
+ }
+
+ if (!connectionFailedCollectors.isEmpty()) {
+ log.trace("Connection failed: " + connectionFailedCollectors);
+ recordErrors(
+ configValues,
+ "Connection Failed: " + connectionFailedCollectors,
+ SplunkSinkConnectorConfig.URI_CONF
+ );
+ return false;
+ }
+
+ return true;
+ }
+
/**
* We validate access by posting an empty payload to the Splunk endpoint.
*
@@ -111,55 +185,51 @@ public Config validate(Map connectorConfigs) {
* For an invalid hostname and other errors, the Java UnknownHostException and other similar
* Exceptions are thrown.
*
- * @param connectorConfig The connector configuration
+ * @param httpClient HEC HTTP Client
+ * @param hecConfig The HEC configuration
* @param configValues The configuration ConfigValues
*/
- private void validateAccess(SplunkSinkConnectorConfig connectorConfig, Map configValues) {
- HecConfig hecConfig = connectorConfig.getHecConfig();
-
- try (CloseableHttpClient httpClient = createHttpClient(hecConfig)) {
- List uris = hecConfig.getUris();
-
- Map validationFailedIndexers = new LinkedHashMap<>();
-
- for (String uri : uris) {
- log.trace("Validating " + uri);
- HttpPost request = new HttpPost(uri + "/services/collector");
- request.setEntity(new StringEntity(""));
-
- request.addHeader(HttpHeaders.AUTHORIZATION, String.format("Splunk %s", hecConfig.getToken()));
-
- int status = -1;
- try (CloseableHttpResponse response = httpClient.execute(request)) {
- status = response.getStatusLine().getStatusCode();
- if (status == 400) {
- log.trace("Validation succeeded for indexer {}", uri);
- } else if (status == 403) {
- log.trace("Invalid HEC token for indexer {}", uri);
- validationFailedIndexers.put(uri, response.getStatusLine().toString());
- } else {
- log.trace("Validation failed for {}", uri);
- validationFailedIndexers.put(uri, response.getStatusLine().toString());
- }
- } catch (Exception e) {
- log.error("Caught exception while validating", e);
- validationFailedIndexers.put(uri, e.getMessage());
+ private void validateAccess(
+ CloseableHttpClient httpClient,
+ HecConfig hecConfig,
+ Map configValues
+ ) throws UnsupportedEncodingException {
+ List uris = hecConfig.getUris();
+
+ Map accessFailedCollectors = new LinkedHashMap<>();
+
+ for (String uri : uris) {
+ log.trace("Validating " + uri);
+ HttpPost request = new HttpPost(uri + COLLECTOR_ENDPOINT);
+ request.setEntity(new StringEntity(""));
+
+ request.addHeader(HttpHeaders.AUTHORIZATION, String.format("Splunk %s", hecConfig.getToken()));
+
+ int status = -1;
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ status = response.getStatusLine().getStatusCode();
+ if (status == 400) {
+ log.trace("Validation succeeded for collector {}", uri);
+ } else if (status == 403) {
+ log.trace("Invalid HEC token for collector {}", uri);
+ accessFailedCollectors.put(uri, response.getStatusLine().toString());
+ } else {
+ log.trace("Validation failed for {}", uri);
+ accessFailedCollectors.put(uri, response.getStatusLine().toString());
}
+ } catch (Exception e) {
+ log.error("Caught exception while validating", e);
+ accessFailedCollectors.put(uri, e.getMessage());
}
+ }
- if (!validationFailedIndexers.isEmpty()) {
- log.trace("Validation failed: " + validationFailedIndexers);
- recordErrors(configValues,
- "Validation Failed: " + validationFailedIndexers,
- SplunkSinkConnectorConfig.URI_CONF,
- SplunkSinkConnectorConfig.TOKEN_CONF);
- }
- } catch (IOException e) {
- log.error("Configuration validation error", e);
- recordErrors(configValues,
- "Configuration validation error: " + e.getMessage(),
- SplunkSinkConnectorConfig.URI_CONF,
- SplunkSinkConnectorConfig.TOKEN_CONF);
+ if (!accessFailedCollectors.isEmpty()) {
+ log.trace("Validation failed: " + accessFailedCollectors);
+ recordErrors(
+ configValues,
+ "Validation Failed: " + accessFailedCollectors,
+ SplunkSinkConnectorConfig.TOKEN_CONF
+ );
}
}
diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
index 5ba2d16e..2958cf42 100644
--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
+++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
@@ -44,19 +44,22 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
class SplunkSinkConnecterTest {
- private String SPLUNK_URI_HOST1 = "https://www.host1.com:1111/";
+ final private String SPLUNK_URI_HOST1 = "https://www.host1.com:1111/";
@Mock
- CloseableHttpResponse validHttpResponse;
+ CloseableHttpResponse okHttpResponse;
@Mock
- CloseableHttpResponse unauthorizedHttpResponse;
+ CloseableHttpResponse badRequestHttpResponse;
+
+ @Mock
+ CloseableHttpResponse forbiddenHttpResponse;
@Mock
CloseableHttpResponse notFoundHttpResponse;
@@ -73,16 +76,21 @@ void setUp() {
doReturn(httpClient)
.when(connector)
- .createHttpClient(anyObject());
+ .createHttpClient(any());
+
+ when(okHttpResponse.getStatusLine()).
+ thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_OK, "OK"));
+ when(okHttpResponse.getEntity())
+ .thenReturn(new StringEntity("{\"text\":\"HEC is healthy\",\"code\":17}", ContentType.APPLICATION_JSON));
- when(validHttpResponse.getStatusLine()).
+ when(badRequestHttpResponse.getStatusLine()).
thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_BAD_REQUEST, "Bad Request"));
- when(validHttpResponse.getEntity())
+ when(badRequestHttpResponse.getEntity())
.thenReturn(new StringEntity("{\"text\":\"No data\",\"code\":5}", ContentType.APPLICATION_JSON));
- when(unauthorizedHttpResponse.getStatusLine())
- .thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_FORBIDDEN, "Unauthorized"));
- when(unauthorizedHttpResponse.getEntity())
+ when(forbiddenHttpResponse.getStatusLine())
+ .thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_FORBIDDEN, "Forbidden"));
+ when(forbiddenHttpResponse.getEntity())
.thenReturn(new StringEntity("{\"text\":\"Invalid token\",\"code\":4}", ContentType.APPLICATION_JSON));
when(notFoundHttpResponse.getStatusLine()).
@@ -136,7 +144,8 @@ public void testValidationEmptyConfig() {
public void testValidationSuccess() throws IOException {
Map connectorConfig = getConnectorConfig();
- doReturn(validHttpResponse)
+ doReturn(okHttpResponse)
+ .doReturn(badRequestHttpResponse)
.when(httpClient)
.execute(any());
@@ -147,10 +156,10 @@ public void testValidationSuccess() throws IOException {
}
@Test
- public void testValidationFailure() throws IOException {
+ public void testConnectionFailure() throws IOException {
Map connectorConfig = getConnectorConfig();
- doReturn(unauthorizedHttpResponse)
+ doThrow(new UnknownHostException("Host not found"))
.when(httpClient)
.execute(any());
@@ -160,23 +169,42 @@ public void testValidationFailure() throws IOException {
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
- assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(SPLUNK_URI_HOST1));
+ }
+
+ @Test
+ public void testAccessFailure() throws IOException {
+ Map connectorConfig = getConnectorConfig();
+
+ doReturn(okHttpResponse)
+ .doReturn(forbiddenHttpResponse)
+ .when(httpClient)
+ .execute(any());
+
+ Config config = connector.validate(connectorConfig);
+
+ Map> errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+
assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1));
}
@Test
- public void testMultipleValidationFailure() throws IOException {
+ public void testMultipleConnectionFailure() throws IOException {
Map connectorConfig = getConnectorConfig();
String host2 = "https://www.host2.com:2222/";
String host3 = "https://www.host3.com:3333/";
connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF,
SPLUNK_URI_HOST1 + "," + host2 + "," + host3);
- doReturn(unauthorizedHttpResponse)
+ doReturn(notFoundHttpResponse)
.doThrow(new UnknownHostException("Host not found"))
- .doReturn(notFoundHttpResponse)
+ .doReturn(okHttpResponse)
.when(httpClient)
.execute(any());
@@ -187,13 +215,40 @@ public void testMultipleValidationFailure() throws IOException {
assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
- assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(SPLUNK_URI_HOST1));
assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host2));
- assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host3));
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).get(0).contains(host3));
+ }
- assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1));
+ @Test
+ public void testMultipleAccessFailure() throws IOException {
+ Map connectorConfig = getConnectorConfig();
+ String host2 = "https://www.host2.com:2222/";
+ String host3 = "https://www.host3.com:3333/";
+ connectorConfig.put(SplunkSinkConnectorConfig.URI_CONF,
+ SPLUNK_URI_HOST1 + "," + host2 + "," + host3);
+
+ doReturn(okHttpResponse)
+ .doReturn(okHttpResponse)
+ .doReturn(okHttpResponse)
+ .doReturn(badRequestHttpResponse)
+ .doThrow(new UnknownHostException("Host not found"))
+ .doReturn(notFoundHttpResponse)
+ .when(httpClient)
+ .execute(any());
+
+ Config config = connector.validate(connectorConfig);
+
+ Map> errorMessages = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
+
+
+ assertTrue(errorMessages.get(SplunkSinkConnectorConfig.URI_CONF).isEmpty());
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).isEmpty());
+
+ assertFalse(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(SPLUNK_URI_HOST1));
assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(host2));
assertTrue(errorMessages.get(SplunkSinkConnectorConfig.TOKEN_CONF).get(0).contains(host3));
}
From dc0b459f97c99ddb173e511028c758fc03228164 Mon Sep 17 00:00:00 2001
From: Raul Agepati <35212250+ragepati@users.noreply.github.com>
Date: Wed, 5 Jan 2022 12:07:05 +0530
Subject: [PATCH 03/17] CCLOG-1248: Bump up Splunk Sink connector version (#11)
---
src/main/resources/version.properties | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/resources/version.properties b/src/main/resources/version.properties
index c300f0f3..23ee6de7 100644
--- a/src/main/resources/version.properties
+++ b/src/main/resources/version.properties
@@ -1,3 +1,3 @@
githash=
gitbranch=2.0.5.x
-gitversion=v2.0.5.1
+gitversion=v2.0.5.2
From d1e7f95af1c19b02f6226da339ddba9dd97dc82d Mon Sep 17 00:00:00 2001
From: naveenmall11
Date: Fri, 11 Nov 2022 20:26:25 +0530
Subject: [PATCH 04/17] RCCA-9576 validate in case of success response
---
src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
index aa82e9b9..ee30ec4f 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
@@ -208,7 +208,7 @@ private void validateAccess(
int status = -1;
try (CloseableHttpResponse response = httpClient.execute(request)) {
status = response.getStatusLine().getStatusCode();
- if (status == 400) {
+ if (status == 400 || status == 200) {
log.trace("Validation succeeded for collector {}", uri);
} else if (status == 403) {
log.trace("Invalid HEC token for collector {}", uri);
From afb03b7e8119501293ff962960a661242c74436c Mon Sep 17 00:00:00 2001
From: naveenmall11
Date: Mon, 14 Nov 2022 13:59:31 +0530
Subject: [PATCH 05/17] adding test
---
.../kafka/connect/SplunkSinkConnecterTest.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
index 2958cf42..f14673b2 100644
--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
+++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java
@@ -155,6 +155,21 @@ public void testValidationSuccess() throws IOException {
}
}
+ @Test
+ public void testValidationSuccessWithSuccessResponse() throws IOException {
+ Map connectorConfig = getConnectorConfig();
+
+ doReturn(okHttpResponse)
+ .doReturn(okHttpResponse)
+ .when(httpClient)
+ .execute(any());
+
+ Config config = connector.validate(connectorConfig);
+ for (ConfigValue value : config.configValues()) {
+ assertTrue(value.errorMessages().isEmpty());
+ }
+ }
+
@Test
public void testConnectionFailure() throws IOException {
Map connectorConfig = getConnectorConfig();
From 38486af9351447508a2d92e87e8f17e67a3a7bd8 Mon Sep 17 00:00:00 2001
From: naveenmall11
Date: Thu, 17 Nov 2022 15:37:46 +0530
Subject: [PATCH 06/17] remove logging of record value
---
src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
index 2e2af396..57c4f3a7 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
@@ -401,7 +401,7 @@ private Event createHecEventFrom(final SinkRecord record) {
event.setTied(record);
event.addFields(connectorConfig.enrichments);
} catch(Exception e) {
- log.trace("event does not follow correct HEC pre-formatted format: {}", record.value().toString());
+ log.trace("event does not follow correct HEC pre-formatted format for record having offset: {} and topic: {}", record.kafkaOffset(), record.topic());
event = createHECEventNonFormatted(record);
}
} else {
From d609851478bcf864fac7f55f67c016205795f5d4 Mon Sep 17 00:00:00 2001
From: naveenmall11
Date: Fri, 18 Nov 2022 13:35:40 +0530
Subject: [PATCH 07/17] logging kafka partition info too
---
src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
index 57c4f3a7..407c2c97 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
@@ -401,7 +401,7 @@ private Event createHecEventFrom(final SinkRecord record) {
event.setTied(record);
event.addFields(connectorConfig.enrichments);
} catch(Exception e) {
- log.trace("event does not follow correct HEC pre-formatted format for record having offset: {} and topic: {}", record.kafkaOffset(), record.topic());
+ log.trace("event does not follow correct HEC pre-formatted format for record having offset: {}, topic Name: {} and topic Partition: {}", record.kafkaOffset(), record.topic(), record.kafkaPartition());
event = createHECEventNonFormatted(record);
}
} else {
From e92cd79721a137c779a9d235e1c849f009cbd916 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Thu, 17 Aug 2023 15:49:47 +0530
Subject: [PATCH 08/17] Added timeout to httpclient in splunk sink
---
src/main/java/com/splunk/hecclient/Hec.java | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java
index e0c5b98a..6c8b97f5 100644
--- a/src/main/java/com/splunk/hecclient/Hec.java
+++ b/src/main/java/com/splunk/hecclient/Hec.java
@@ -274,6 +274,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
return new HttpClientBuilder().setDisableSSLCertVerification(config.getDisableSSLCertVerification())
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
+ .setSocketTimeout(config.getSocketTimeout())
.build();
}
@@ -286,6 +287,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSslContext(context)
+ .setSocketTimeout(config.getSocketTimeout())
.build();
}
else {
From 5042b72f734335865c11100539ce17703b9323e5 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Wed, 8 Nov 2023 09:14:51 +0530
Subject: [PATCH 09/17] Adding timeout for httpclient
---
src/main/java/com/splunk/hecclient/HttpClientBuilder.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
index 4047ca37..8d83fac9 100644
--- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
+++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
@@ -74,6 +74,9 @@ public CloseableHttpClient build() {
.setSoTimeout(socketTimeout * 1000)
.build();
RequestConfig requestConfig = RequestConfig.custom()
+ .setSocketTimeout(socketTimeout)
+ .setConnectionRequestTimeout(socketTimeout)
+ .setConnectTimeout(socketTimeout)
.setCookieSpec(CookieSpecs.STANDARD)
.build();
From 9ac6db1bd35c54a42c80abe256362429dc67bc87 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Mon, 20 Nov 2023 21:00:35 +0530
Subject: [PATCH 10/17] wip
---
src/main/java/com/splunk/hecclient/Hec.java | 2 ++
.../java/com/splunk/hecclient/HecConfig.java | 20 +++++++++++++++++++
.../splunk/hecclient/HttpClientBuilder.java | 18 ++++++++++++++---
.../connect/SplunkSinkConnectorConfig.java | 12 +++++++++++
4 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java
index 6c8b97f5..aa325d80 100644
--- a/src/main/java/com/splunk/hecclient/Hec.java
+++ b/src/main/java/com/splunk/hecclient/Hec.java
@@ -275,6 +275,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setMaxConnectionPoolSizePerDestination(poolSizePerDest)
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSocketTimeout(config.getSocketTimeout())
+ .setConnectionTimeout(config.getConnectionTimeout())
+ .setConnectionRequestTimeout(config.getConnectionRequestTimeout())
.build();
}
diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java
index 3a1987a4..bd2e1eb0 100644
--- a/src/main/java/com/splunk/hecclient/HecConfig.java
+++ b/src/main/java/com/splunk/hecclient/HecConfig.java
@@ -29,6 +29,8 @@ public final class HecConfig {
private int ackPollInterval = 10; // in seconds
private int ackPollThreads = 2;
private int socketTimeout = 60; // in seconds
+ private int connectionTimeout = 60; // in seconds
+ private int connectionRequestTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in byte
private int backoffThresholdSeconds = 60 * 1000;
private boolean enableChannelTracking = false;
@@ -62,6 +64,14 @@ public int getSocketTimeout() {
return socketTimeout;
}
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public int getConnectionRequestTimeout() {
+ return connectionRequestTimeout;
+ }
+
public int getSocketSendBufferSize() {
return socketSendBufferSize;
}
@@ -119,6 +129,16 @@ public HecConfig setSocketTimeout(int timeout /*seconds*/) {
return this;
}
+ public HecConfig setConnectionTimeout(int timeout /*seconds*/) {
+ connectionTimeout = timeout;
+ return this;
+ }
+
+ public HecConfig setConnectionRequestTimeout(int timeout /*seconds*/) {
+ connectionRequestTimeout = timeout;
+ return this;
+ }
+
public HecConfig setSocketSendBufferSize(int bufSize /*bytes*/) {
socketSendBufferSize = bufSize;
return this;
diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
index 8d83fac9..da0979bb 100644
--- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
+++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
@@ -33,6 +33,8 @@ public final class HttpClientBuilder {
private int maxConnectionPoolSizePerDestination = 4;
private int maxConnectionPoolSize = 4 * 2;
private int socketTimeout = 60; // in seconds
+ private int connectionRequestTimeout = 60; // in seconds
+ private int connectionTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
private boolean disableSSLCertVerification = false;
private SSLContext sslContext = null;
@@ -52,6 +54,16 @@ public HttpClientBuilder setSocketTimeout(int timeout /*seconds*/) {
return this;
}
+ public HttpClientBuilder setConnectionRequestTimeout(int timeout /*seconds*/) {
+ this.connectionRequestTimeout = timeout;
+ return this;
+ }
+
+ public HttpClientBuilder setConnectionTimeout(int timeout /*seconds*/) {
+ this.connectionTimeout = timeout;
+ return this;
+ }
+
public HttpClientBuilder setSocketSendBufferSize(int bufSize /*bytes*/) {
this.socketSendBufferSize = bufSize;
return this;
@@ -74,9 +86,9 @@ public CloseableHttpClient build() {
.setSoTimeout(socketTimeout * 1000)
.build();
RequestConfig requestConfig = RequestConfig.custom()
- .setSocketTimeout(socketTimeout)
- .setConnectionRequestTimeout(socketTimeout)
- .setConnectTimeout(socketTimeout)
+ .setSocketTimeout(socketTimeout * 1000)
+ .setConnectionRequestTimeout(connectionRequestTimeout * 1000)
+ .setConnectTimeout(connectionTimeout * 1000)
.setCookieSpec(CookieSpecs.STANDARD)
.build();
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
index 585e07ce..1db68683 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
@@ -47,6 +47,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive";
static final String HEC_THREDS_CONF = "splunk.hec.threads";
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
+ static final String CONENCTION_TIMEOUT_CONF = "splunk.hec.connection.timeout"; // seconds
+ static final String CONENCTION_REQUEST_TIMEOUT_CONF = "splunk.hec.connection.request.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
// Acknowledgement Parameters
@@ -193,6 +195,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final boolean httpKeepAlive;
final int numberOfThreads;
final int socketTimeout;
+ final int connectionTimeout;
+ final int connectionRequestTimeout;
final boolean validateCertificates;
final boolean enableCompression;
final int lbPollInterval;
@@ -247,6 +251,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
flushWindow = getInt(FLUSH_WINDOW_CONF);
totalHecChannels = getInt(TOTAL_HEC_CHANNEL_CONF);
socketTimeout = getInt(SOCKET_TIMEOUT_CONF);
+ connectionTimeout = getInt(CONENCTION_TIMEOUT_CONF);
+ connectionRequestTimeout = getInt(CONENCTION_REQUEST_TIMEOUT_CONF);
enrichments = parseEnrichments(getString(ENRICHMENT_CONF));
trackData = getBoolean(TRACK_DATA_CONF);
useRecordTimestamp = getBoolean(USE_RECORD_TIMESTAMP_CONF);
@@ -288,6 +294,8 @@ public static ConfigDef conf() {
.define(FLUSH_WINDOW_CONF, ConfigDef.Type.INT, 30, ConfigDef.Importance.LOW, FLUSH_WINDOW_DOC)
.define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC)
.define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
+ .define(CONNECTION_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
+ .define(CONENCTION_REQUEST_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
.define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC)
.define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC)
.define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC)
@@ -315,6 +323,8 @@ public HecConfig getHecConfig() {
HecConfig config = new HecConfig(Arrays.asList(splunkURI.split(",")), splunkToken);
config.setDisableSSLCertVerification(!validateCertificates)
.setSocketTimeout(socketTimeout)
+ .setConnectionTimeout(connectionTimeout)
+ .setConnectionRequestTimeout(connectionRequestTimeout)
.setMaxHttpConnectionPerChannel(maxHttpConnPerChannel)
.setTotalChannels(totalHecChannels)
.setEventBatchTimeout(eventBatchTimeout)
@@ -349,6 +359,8 @@ public String toString() {
+ "validateCertificates:" + validateCertificates + ", "
+ "trustStorePath:" + trustStorePath + ", "
+ "socketTimeout:" + socketTimeout + ", "
+ + "connectionTimeout:" + connectionTimeout + ", "
+ + "connectionRequestTimeout:" + connectionRequestTimeout + ", "
+ "eventBatchTimeout:" + eventBatchTimeout + ", "
+ "ackPollInterval:" + ackPollInterval + ", "
+ "ackPollThreads:" + ackPollThreads + ", "
From 616ed6bbf94f7e1b7663ccff081c5a67703c6ad4 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Tue, 21 Nov 2023 13:09:30 +0530
Subject: [PATCH 11/17] Added connection timeout and connection request timeout
to http client
---
.../kafka/connect/SplunkSinkConnectorConfig.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
index 1db68683..e85409b8 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
@@ -47,8 +47,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
static final String HTTP_KEEPALIVE_CONF = "splunk.hec.http.keepalive";
static final String HEC_THREDS_CONF = "splunk.hec.threads";
static final String SOCKET_TIMEOUT_CONF = "splunk.hec.socket.timeout"; // seconds
- static final String CONENCTION_TIMEOUT_CONF = "splunk.hec.connection.timeout"; // seconds
- static final String CONENCTION_REQUEST_TIMEOUT_CONF = "splunk.hec.connection.request.timeout"; // seconds
+ static final String CONNECTION_TIMEOUT_CONF = "splunk.hec.connection.timeout"; // seconds
+ static final String CONNECTION_REQUEST_TIMEOUT_CONF = "splunk.hec.connection.request.timeout"; // seconds
static final String SSL_VALIDATE_CERTIFICATES_CONF = "splunk.hec.ssl.validate.certs";
static final String ENABLE_COMPRESSSION_CONF = "splunk.hec.enable.compression";
// Acknowledgement Parameters
@@ -251,8 +251,8 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
flushWindow = getInt(FLUSH_WINDOW_CONF);
totalHecChannels = getInt(TOTAL_HEC_CHANNEL_CONF);
socketTimeout = getInt(SOCKET_TIMEOUT_CONF);
- connectionTimeout = getInt(CONENCTION_TIMEOUT_CONF);
- connectionRequestTimeout = getInt(CONENCTION_REQUEST_TIMEOUT_CONF);
+ connectionTimeout = getInt(CONNECTION_TIMEOUT_CONF);
+ connectionRequestTimeout = getInt(CONNECTION_REQUEST_TIMEOUT_CONF);
enrichments = parseEnrichments(getString(ENRICHMENT_CONF));
trackData = getBoolean(TRACK_DATA_CONF);
useRecordTimestamp = getBoolean(USE_RECORD_TIMESTAMP_CONF);
@@ -295,7 +295,7 @@ public static ConfigDef conf() {
.define(TOTAL_HEC_CHANNEL_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.HIGH, TOTAL_HEC_CHANNEL_DOC)
.define(SOCKET_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
.define(CONNECTION_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
- .define(CONENCTION_REQUEST_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
+ .define(CONNECTION_REQUEST_TIMEOUT_CONF, ConfigDef.Type.INT, 60, ConfigDef.Importance.LOW, SOCKET_TIMEOUT_DOC)
.define(ENRICHMENT_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ENRICHMENT_DOC)
.define(TRACK_DATA_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, TRACK_DATA_DOC)
.define(USE_RECORD_TIMESTAMP_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, USE_RECORD_TIMESTAMP_DOC)
From ed1c1fd6ee2c665717c749acfc627a0ea935dcc1 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Tue, 21 Nov 2023 13:12:45 +0530
Subject: [PATCH 12/17] Added connection timeout and connection request timeout
to httpclient
---
src/main/java/com/splunk/hecclient/Hec.java | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java
index aa325d80..4f3c662b 100644
--- a/src/main/java/com/splunk/hecclient/Hec.java
+++ b/src/main/java/com/splunk/hecclient/Hec.java
@@ -290,6 +290,8 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setMaxConnectionPoolSize(poolSizePerDest * config.getUris().size())
.setSslContext(context)
.setSocketTimeout(config.getSocketTimeout())
+ .setConnectionTimeout(config.getConnectionTimeout())
+ .setConnectionRequestTimeout(config.getConnectionRequestTimeout())
.build();
}
else {
From 5591baae26ece8b79722ad5dbf79d864c277d86a Mon Sep 17 00:00:00 2001
From: Rishabh Sharma
Date: Mon, 4 Dec 2023 17:43:11 +0530
Subject: [PATCH 13/17] Added config to disable hostname verification (#19)
* Added config to disable hostname verification
---
src/main/java/com/splunk/hecclient/Hec.java | 2 ++
.../java/com/splunk/hecclient/HecConfig.java | 9 +++++++++
.../com/splunk/hecclient/HttpClientBuilder.java | 14 ++++++++++++++
.../kafka/connect/SplunkSinkConnectorConfig.java | 9 ++++++++-
.../splunk/hecclient/HttpClientBuilderTest.java | 13 +++++++++++++
.../connect/SplunkSinkConnectorConfigTest.java | 16 ++++++++++++++++
6 files changed, 62 insertions(+), 1 deletion(-)
diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java
index 4f3c662b..e598a532 100644
--- a/src/main/java/com/splunk/hecclient/Hec.java
+++ b/src/main/java/com/splunk/hecclient/Hec.java
@@ -277,6 +277,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setSocketTimeout(config.getSocketTimeout())
.setConnectionTimeout(config.getConnectionTimeout())
.setConnectionRequestTimeout(config.getConnectionRequestTimeout())
+ .setDisableHostnameVerification(config.getDisableHostnameVerification())
.build();
}
@@ -292,6 +293,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setSocketTimeout(config.getSocketTimeout())
.setConnectionTimeout(config.getConnectionTimeout())
.setConnectionRequestTimeout(config.getConnectionRequestTimeout())
+ .setDisableHostnameVerification(config.getDisableHostnameVerification())
.build();
}
else {
diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java
index bd2e1eb0..c0c38ca0 100644
--- a/src/main/java/com/splunk/hecclient/HecConfig.java
+++ b/src/main/java/com/splunk/hecclient/HecConfig.java
@@ -38,6 +38,7 @@ public final class HecConfig {
private String trustStorePath;
private String trustStorePassword;
private int lbPollInterval = 120; // in seconds
+ private Boolean disableHostnameVerification;
public HecConfig(List uris, String token) {
this.uris = uris;
@@ -198,4 +199,12 @@ public HecConfig setBackoffThresholdSeconds(int backoffSeconds) {
backoffThresholdSeconds = backoffSeconds * 1000;
return this;
}
+
+ public boolean getDisableHostnameVerification() {
+ return this.disableHostnameVerification;
+ }
+
+ public void setDisableHostnameVerification(Boolean disableHostnameVerification) {
+ this.disableHostnameVerification = disableHostnameVerification;
+ }
}
diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
index da0979bb..91f3a334 100644
--- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
+++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
@@ -18,6 +18,7 @@
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -37,6 +38,7 @@ public final class HttpClientBuilder {
private int connectionTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
private boolean disableSSLCertVerification = false;
+ private boolean disableHostnameVerification = false;
private SSLContext sslContext = null;
public HttpClientBuilder setMaxConnectionPoolSizePerDestination(int connections) {
@@ -74,6 +76,15 @@ public HttpClientBuilder setDisableSSLCertVerification(boolean disableVerificati
return this;
}
+ public HttpClientBuilder setDisableHostnameVerification(boolean disableHostnameVerification) {
+ this.disableHostnameVerification = disableHostnameVerification;
+ return this;
+ }
+
+ public Boolean getDisableHostnameVerification() {
+ return this.disableHostnameVerification;
+ }
+
public HttpClientBuilder setSslContext(SSLContext context) {
this.sslContext = context;
return this;
@@ -138,6 +149,9 @@ private SSLConnectionSocketFactory getSecureSSLConnectionFactory() {
if (this.sslContext == null) {
return null; // use system default one
} else {
+ if (this.getDisableHostnameVerification()) {
+ return new SSLConnectionSocketFactory(this.sslContext, NoopHostnameVerifier.INSTANCE);
+ }
return new SSLConnectionSocketFactory(this.sslContext);
}
}
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
index e85409b8..1601be9e 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
@@ -72,6 +72,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
// Trust store
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
+ static final String DISABLE_HOSTNAME_VERIFICATION_CONF = "splunk.disable.hostname.verification";
//Headers
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
static final String HEADER_CUSTOM_CONF = "splunk.header.custom";
@@ -82,6 +83,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
// Load Balancer
static final String LB_POLL_INTERVAL_CONF = "splunk.hec.lb.poll.interval";
+
// Kafka configuration description strings
// Required Parameters
static final String URI_DOC = "Splunk HEC URIs. Either a list of FQDNs or IPs of all Splunk indexers, separated "
@@ -167,6 +169,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
// TBD
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
+ static final String DISABLE_HOSTNAME_VERIFICATION_DOC = "Whether to disable hostname verification in case of SSL Configs being used.";
static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them"
@@ -218,6 +221,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final boolean trackData;
final boolean hasTrustStorePath;
+ final boolean disableHostnameVerification;
final String trustStorePath;
final String trustStorePassword;
@@ -238,6 +242,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
sourcetypes = getString(SOURCETYPE_CONF);
sources = getString(SOURCE_CONF);
httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF);
+ disableHostnameVerification = getBoolean(DISABLE_HOSTNAME_VERIFICATION_CONF);
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
@@ -287,6 +292,7 @@ public static ConfigDef conf() {
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
+ .define(DISABLE_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, DISABLE_HOSTNAME_VERIFICATION_DOC)
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
.define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC)
@@ -336,7 +342,8 @@ public HecConfig getHecConfig() {
.setBackoffThresholdSeconds(backoffThresholdSeconds)
.setTrustStorePath(trustStorePath)
.setTrustStorePassword(trustStorePassword)
- .setHasCustomTrustStore(hasTrustStorePath);
+ .setHasCustomTrustStore(hasTrustStorePath)
+ .setDisableHostnameVerification(disableHostnameVerification);
return config;
}
diff --git a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
index 3d5fbfff..1d2d471b 100644
--- a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
+++ b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
@@ -44,6 +44,19 @@ public void buildSecureDefault() {
.build();
Assert.assertNotNull(client);
}
+
+ @Test
+ public void buildWithDisabledHostnameVerification() {
+ HttpClientBuilder builder = new HttpClientBuilder();
+ CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
+ .setMaxConnectionPoolSize(2)
+ .setSocketSendBufferSize(1024)
+ .setSocketTimeout(120)
+ .setDisableSSLCertVerification(false)
+ .setDisableHostnameVerification(true)
+ .build();
+ Assert.assertNotNull(client);
+ }
@Test
public void buildSecureCustomKeystore() {
HttpClientBuilder builder = new HttpClientBuilder();
diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java
index a59752ab..4aaf2f89 100644
--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java
+++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java
@@ -74,6 +74,22 @@ public void getHecConfig() {
}
}
+ @Test
+ public void getDisableHostnameVerificationConfig() {
+ UnitUtil uu = new UnitUtil(0);
+ Map taskConfig = uu.createTaskConfig();
+ taskConfig.put(SplunkSinkConnectorConfig.DISABLE_HOSTNAME_VERIFICATION_CONF, String.valueOf(false));
+ SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
+ HecConfig config = connectorConfig.getHecConfig();
+ Assert.assertEquals(false, config.getDisableSSLCertVerification());
+
+ taskConfig.put(SplunkSinkConnectorConfig.DISABLE_HOSTNAME_VERIFICATION_CONF, String.valueOf(true));
+ connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
+ config = connectorConfig.getHecConfig();
+ Assert.assertEquals(true, config.getDisableSSLCertVerification());
+
+ }
+
@Test
public void getHecConfigCustomKeystore() {
UnitUtil uu = new UnitUtil(1);
From cbd3e7442b296c54f222b5222a38c7652efafc88 Mon Sep 17 00:00:00 2001
From: Rishabh Sharma
Date: Fri, 5 Jan 2024 15:05:21 +0530
Subject: [PATCH 14/17] Revert "Added config to disable hostname verification
(#19)" (#20)
This reverts commit 5591baae26ece8b79722ad5dbf79d864c277d86a.
---
src/main/java/com/splunk/hecclient/Hec.java | 2 --
.../java/com/splunk/hecclient/HecConfig.java | 9 ---------
.../com/splunk/hecclient/HttpClientBuilder.java | 14 --------------
.../kafka/connect/SplunkSinkConnectorConfig.java | 9 +--------
.../splunk/hecclient/HttpClientBuilderTest.java | 13 -------------
.../connect/SplunkSinkConnectorConfigTest.java | 16 ----------------
6 files changed, 1 insertion(+), 62 deletions(-)
diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java
index e598a532..4f3c662b 100644
--- a/src/main/java/com/splunk/hecclient/Hec.java
+++ b/src/main/java/com/splunk/hecclient/Hec.java
@@ -277,7 +277,6 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setSocketTimeout(config.getSocketTimeout())
.setConnectionTimeout(config.getConnectionTimeout())
.setConnectionRequestTimeout(config.getConnectionRequestTimeout())
- .setDisableHostnameVerification(config.getDisableHostnameVerification())
.build();
}
@@ -293,7 +292,6 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
.setSocketTimeout(config.getSocketTimeout())
.setConnectionTimeout(config.getConnectionTimeout())
.setConnectionRequestTimeout(config.getConnectionRequestTimeout())
- .setDisableHostnameVerification(config.getDisableHostnameVerification())
.build();
}
else {
diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java
index c0c38ca0..bd2e1eb0 100644
--- a/src/main/java/com/splunk/hecclient/HecConfig.java
+++ b/src/main/java/com/splunk/hecclient/HecConfig.java
@@ -38,7 +38,6 @@ public final class HecConfig {
private String trustStorePath;
private String trustStorePassword;
private int lbPollInterval = 120; // in seconds
- private Boolean disableHostnameVerification;
public HecConfig(List uris, String token) {
this.uris = uris;
@@ -199,12 +198,4 @@ public HecConfig setBackoffThresholdSeconds(int backoffSeconds) {
backoffThresholdSeconds = backoffSeconds * 1000;
return this;
}
-
- public boolean getDisableHostnameVerification() {
- return this.disableHostnameVerification;
- }
-
- public void setDisableHostnameVerification(Boolean disableHostnameVerification) {
- this.disableHostnameVerification = disableHostnameVerification;
- }
}
diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
index 91f3a334..da0979bb 100644
--- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
+++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java
@@ -18,7 +18,6 @@
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.SocketConfig;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -38,7 +37,6 @@ public final class HttpClientBuilder {
private int connectionTimeout = 60; // in seconds
private int socketSendBufferSize = 8 * 1024 * 1024; // in bytes
private boolean disableSSLCertVerification = false;
- private boolean disableHostnameVerification = false;
private SSLContext sslContext = null;
public HttpClientBuilder setMaxConnectionPoolSizePerDestination(int connections) {
@@ -76,15 +74,6 @@ public HttpClientBuilder setDisableSSLCertVerification(boolean disableVerificati
return this;
}
- public HttpClientBuilder setDisableHostnameVerification(boolean disableHostnameVerification) {
- this.disableHostnameVerification = disableHostnameVerification;
- return this;
- }
-
- public Boolean getDisableHostnameVerification() {
- return this.disableHostnameVerification;
- }
-
public HttpClientBuilder setSslContext(SSLContext context) {
this.sslContext = context;
return this;
@@ -149,9 +138,6 @@ private SSLConnectionSocketFactory getSecureSSLConnectionFactory() {
if (this.sslContext == null) {
return null; // use system default one
} else {
- if (this.getDisableHostnameVerification()) {
- return new SSLConnectionSocketFactory(this.sslContext, NoopHostnameVerifier.INSTANCE);
- }
return new SSLConnectionSocketFactory(this.sslContext);
}
}
diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
index 1601be9e..e85409b8 100644
--- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
+++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java
@@ -72,7 +72,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
// Trust store
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
- static final String DISABLE_HOSTNAME_VERIFICATION_CONF = "splunk.disable.hostname.verification";
//Headers
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
static final String HEADER_CUSTOM_CONF = "splunk.header.custom";
@@ -83,7 +82,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
// Load Balancer
static final String LB_POLL_INTERVAL_CONF = "splunk.hec.lb.poll.interval";
-
// Kafka configuration description strings
// Required Parameters
static final String URI_DOC = "Splunk HEC URIs. Either a list of FQDNs or IPs of all Splunk indexers, separated "
@@ -169,7 +167,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
// TBD
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
- static final String DISABLE_HOSTNAME_VERIFICATION_DOC = "Whether to disable hostname verification in case of SSL Configs being used.";
static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
static final String HEADER_CUSTOM_DOC = "Setting will enable look for Record headers with these values and add them"
@@ -221,7 +218,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
final boolean trackData;
final boolean hasTrustStorePath;
- final boolean disableHostnameVerification;
final String trustStorePath;
final String trustStorePassword;
@@ -242,7 +238,6 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
sourcetypes = getString(SOURCETYPE_CONF);
sources = getString(SOURCE_CONF);
httpKeepAlive = getBoolean(HTTP_KEEPALIVE_CONF);
- disableHostnameVerification = getBoolean(DISABLE_HOSTNAME_VERIFICATION_CONF);
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
@@ -292,7 +287,6 @@ public static ConfigDef conf() {
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
- .define(DISABLE_HOSTNAME_VERIFICATION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, DISABLE_HOSTNAME_VERIFICATION_DOC)
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
.define(ACK_POLL_THREADS_CONF, ConfigDef.Type.INT, 2, ConfigDef.Importance.MEDIUM, ACK_POLL_THREADS_DOC)
@@ -342,8 +336,7 @@ public HecConfig getHecConfig() {
.setBackoffThresholdSeconds(backoffThresholdSeconds)
.setTrustStorePath(trustStorePath)
.setTrustStorePassword(trustStorePassword)
- .setHasCustomTrustStore(hasTrustStorePath)
- .setDisableHostnameVerification(disableHostnameVerification);
+ .setHasCustomTrustStore(hasTrustStorePath);
return config;
}
diff --git a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
index 1d2d471b..3d5fbfff 100644
--- a/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
+++ b/src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java
@@ -44,19 +44,6 @@ public void buildSecureDefault() {
.build();
Assert.assertNotNull(client);
}
-
- @Test
- public void buildWithDisabledHostnameVerification() {
- HttpClientBuilder builder = new HttpClientBuilder();
- CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
- .setMaxConnectionPoolSize(2)
- .setSocketSendBufferSize(1024)
- .setSocketTimeout(120)
- .setDisableSSLCertVerification(false)
- .setDisableHostnameVerification(true)
- .build();
- Assert.assertNotNull(client);
- }
@Test
public void buildSecureCustomKeystore() {
HttpClientBuilder builder = new HttpClientBuilder();
diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java
index 4aaf2f89..a59752ab 100644
--- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java
+++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java
@@ -74,22 +74,6 @@ public void getHecConfig() {
}
}
- @Test
- public void getDisableHostnameVerificationConfig() {
- UnitUtil uu = new UnitUtil(0);
- Map taskConfig = uu.createTaskConfig();
- taskConfig.put(SplunkSinkConnectorConfig.DISABLE_HOSTNAME_VERIFICATION_CONF, String.valueOf(false));
- SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
- HecConfig config = connectorConfig.getHecConfig();
- Assert.assertEquals(false, config.getDisableSSLCertVerification());
-
- taskConfig.put(SplunkSinkConnectorConfig.DISABLE_HOSTNAME_VERIFICATION_CONF, String.valueOf(true));
- connectorConfig = new SplunkSinkConnectorConfig(taskConfig);
- config = connectorConfig.getHecConfig();
- Assert.assertEquals(true, config.getDisableSSLCertVerification());
-
- }
-
@Test
public void getHecConfigCustomKeystore() {
UnitUtil uu = new UnitUtil(1);
From bb413643e4cff47fa10d8f0ee590cbc36f5e9789 Mon Sep 17 00:00:00 2001
From: Rishabh Sharma
Date: Fri, 8 Mar 2024 14:56:33 +0530
Subject: [PATCH 15/17] updated snappy-java version to 1.1.10.5 to fix CVE
(#23)
* updated snappy.java.version to 1.1.10.4 by updating connect-plugins-parent to 0.9.23
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 277ae8d2..6c2a724f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
io.confluent
connect-plugins-parent
- 0.6.8
+ 0.9.23
com.github.splunk.kafka.connect
From 84079db29f9fc5cc9d800d52fb70b4d5eb550839 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Fri, 8 Mar 2024 15:37:32 +0530
Subject: [PATCH 16/17] Fixed CVEs CVE-2022-42003, CVE-2021-46877,
CVE-2022-42004, CVE-2020-36518
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 6c2a724f..07146791 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
com.fasterxml.jackson.core
jackson-databind
- 2.10.5.1
+ 2.13.4.1
compile
From e773b2724613a7a42593f529ee9a92793816d947 Mon Sep 17 00:00:00 2001
From: Rishabh
Date: Fri, 8 Mar 2024 15:38:51 +0530
Subject: [PATCH 17/17] updated com.fasterxml.jackson.core:jackson-databind to
2.13.5 to fix CVEs CVE-2022-42003, CVE-2021-46877, CVE-2022-42004,
CVE-2020-36518
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 07146791..8f59f5ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
com.fasterxml.jackson.core
jackson-databind
- 2.13.4.1
+ 2.13.5
compile