diff --git a/pom.xml b/pom.xml index d8ea2ce0..c4feb7dc 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,11 @@ 3.7 compile + + io.confluent + connect-utils + 0.3.1 + diff --git a/src/main/java/com/splunk/hecclient/Hec.java b/src/main/java/com/splunk/hecclient/Hec.java index f170245a..e145ff51 100644 --- a/src/main/java/com/splunk/hecclient/Hec.java +++ b/src/main/java/com/splunk/hecclient/Hec.java @@ -28,6 +28,7 @@ import java.security.KeyManagementException; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.errors.ConnectException; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; @@ -196,7 +197,7 @@ public static HecAckPoller createPoller(HecConfig config, PollerCallback callbac public Hec(HecConfig config, CloseableHttpClient httpClient, Poller poller, LoadBalancerInf loadBalancer) { for (int i = 0; i < config.getTotalChannels(); ) { for (String uri : config.getUris()) { - Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller); + Indexer indexer = new Indexer(uri, httpClient, poller, config); indexer.setKeepAlive(config.getHttpKeepAlive()); loadBalancer.add(indexer.getChannel().setTracking(config.getEnableChannelTracking())); i++; @@ -263,7 +264,13 @@ public final void close() { */ public static CloseableHttpClient createHttpClient(final HecConfig config) { int poolSizePerDest = config.getMaxHttpConnectionPerChannel(); - + if (config.kerberosAuthEnabled()) { + try { + return new HttpClientBuilder().buildKerberosClient(); + } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException ex) { + throw new ConnectException("Unable to build Kerberos Client", ex); + } + } // Code block for default client construction if(!config.getHasCustomTrustStore() && StringUtils.isBlank(config.getTrustStorePath()) && diff --git a/src/main/java/com/splunk/hecclient/HecConfig.java b/src/main/java/com/splunk/hecclient/HecConfig.java index 36cf1ea5..b9997e3d 100644 --- a/src/main/java/com/splunk/hecclient/HecConfig.java +++ b/src/main/java/com/splunk/hecclient/HecConfig.java @@ -34,6 +34,9 @@ public final class HecConfig { private boolean hasCustomTrustStore = false; private String trustStorePath; private String trustStorePassword; + private String kerberosPrincipal; + private String kerberosUser; + private String kerberosKeytabLocation; public HecConfig(List uris, String token) { this.uris = uris; @@ -155,8 +158,39 @@ public HecConfig setHasCustomTrustStore(boolean hasStore) { return this; } + public String kerberosPrincipal() { + return kerberosPrincipal; + } + + public HecConfig setKerberosPrincipal(String kerberosPrincipal) { + this.kerberosPrincipal = kerberosPrincipal; + return this; + } + + public String kerberosUser() { + return kerberosUser; + } + + public HecConfig setKerberosUser(String kerberosUser) { + this.kerberosUser = kerberosUser; + return this; + } + + public String kerberosKeytabLocation() { + return kerberosKeytabLocation; + } + + public HecConfig setKerberosKeytabLocation(String kerberosKeytabLocation) { + this.kerberosKeytabLocation = kerberosKeytabLocation; + return this; + } + public HecConfig setEnableChannelTracking(boolean trackChannel) { enableChannelTracking = trackChannel; return this; } + + public boolean kerberosAuthEnabled() { + return !kerberosPrincipal().isEmpty(); + } } diff --git a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java index 4047ca37..7fe2d9a1 100644 --- a/src/main/java/com/splunk/hecclient/HttpClientBuilder.java +++ b/src/main/java/com/splunk/hecclient/HttpClientBuilder.java @@ -15,11 +15,20 @@ */ package com.splunk.hecclient; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.config.AuthSchemes; import org.apache.http.client.config.CookieSpecs; import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; import org.apache.http.config.SocketConfig; +import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.ssl.SSLContextBuilder; @@ -27,6 +36,10 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.Principal; import java.security.cert.X509Certificate; public final class HttpClientBuilder { @@ -87,6 +100,35 @@ public CloseableHttpClient build() { .build(); } + public CloseableHttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException { + org.apache.http.impl.client.HttpClientBuilder builder = + org.apache.http.impl.client.HttpClientBuilder.create(); + Lookup authSchemeRegistry = RegistryBuilder.create(). + register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build(); + builder.setDefaultAuthSchemeRegistry(authSchemeRegistry); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() { + @Override + public Principal getUserPrincipal() { + return null; + } + @Override + public String getPassword() { + return null; + } + }); + builder.setDefaultCredentialsProvider(credentialsProvider); + SSLContextBuilder sslContextBuilderbuilder = new SSLContextBuilder(); + sslContextBuilderbuilder.loadTrustMaterial(null, (chain, authType) -> true); + SSLConnectionSocketFactory sslsf = new + SSLConnectionSocketFactory( + sslContextBuilderbuilder.build(), NoopHostnameVerifier.INSTANCE); + + builder.setSSLSocketFactory(sslsf); + CloseableHttpClient httpClient = builder.build(); + return httpClient; + } + private SSLConnectionSocketFactory getSSLConnectionFactory() { if (disableSSLCertVerification) { return getUnsecureSSLConnectionSocketFactory(); diff --git a/src/main/java/com/splunk/hecclient/Indexer.java b/src/main/java/com/splunk/hecclient/Indexer.java index 43611c23..d4f481a5 100644 --- a/src/main/java/com/splunk/hecclient/Indexer.java +++ b/src/main/java/com/splunk/hecclient/Indexer.java @@ -15,8 +15,10 @@ */ package com.splunk.hecclient; +import com.splunk.kafka.connect.SplunkSinkConnectorConfig; import org.apache.http.Header; import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.protocol.HttpClientContext; @@ -25,15 +27,27 @@ import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; +import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; +import java.security.Principal; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; final class Indexer implements IndexerInf { private static final Logger log = LoggerFactory.getLogger(Indexer.class); + private HecConfig hecConfig; + private Configuration config; private CloseableHttpClient httpClient; private HttpContext context; private String baseUrl; @@ -47,14 +61,18 @@ final class Indexer implements IndexerInf { private long backPressureThreshhold = 60 * 1000; // 1 min // Indexer doesn't own client, ack poller - public Indexer(String baseUrl, String hecToken, CloseableHttpClient client, Poller poller) { + public Indexer( + String baseUrl, + CloseableHttpClient client, + Poller poller, + HecConfig config) { this.httpClient = client; this.baseUrl = baseUrl; - this.hecToken = hecToken; + this.hecToken = config.getToken(); this.poller = poller; this.context = HttpClientContext.create(); backPressure = 0; - + this.hecConfig = config; channel = new HecChannel(this); // Init headers @@ -137,17 +155,68 @@ public boolean send(final EventBatch batch) { @Override public synchronized String executeHttpRequest(final HttpUriRequest req) { CloseableHttpResponse resp; - try { - resp = httpClient.execute(req, context); - } catch (Exception ex) { - logBackPressure(); - log.error("encountered io exception", ex); - throw new HecException("encountered exception when post data", ex); + if (hecConfig.kerberosAuthEnabled()) { + if (config == null) { + defineKerberosConfigs(); + } + Set princ = new HashSet(1); + princ.add(new KerberosPrincipal(hecConfig.kerberosUser())); + Subject sub = new Subject(false, princ, new HashSet(), new HashSet()); + try { + LoginContext lc = new LoginContext("", sub, null, config); + lc.login(); + Subject serviceSubject = lc.getSubject(); + resp = Subject.doAs(serviceSubject, new PrivilegedAction() { + @Override + public CloseableHttpResponse run() { + try { + return httpClient.execute(req, context); + } catch (IOException ex) { + logBackPressure(); + throw new HecException("Encountered exception while posting data.", ex); + } + } + }); + } catch (Exception le) { + throw new HecException( + "Encountered exception while authenticating via Kerberos.", le); + } + } else { + try { + resp = httpClient.execute(req, context); + } catch (Exception ex) { + logBackPressure(); + throw new HecException("encountered exception when post data", ex); + } } - return readAndCloseResponse(resp); } + private void defineKerberosConfigs() { + config = new Configuration() { + @SuppressWarnings("serial") + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return new AppConfigurationEntry[]{new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, new HashMap() { + { + put("useTicketCache", "false"); + put("useKeyTab", "true"); + put("keyTab", hecConfig.kerberosKeytabLocation()); + //Krb5 in GSS API needs to be refreshed so it does not throw the error + //Specified version of key is not available + put("refreshKrb5Config", "true"); + put("principal", hecConfig.kerberosPrincipal()); + put("storeKey", "false"); + put("doNotPrompt", "true"); + put("isInitiator", "true"); + put("debug", "true"); + } + })}; + } + }; + } + private String readAndCloseResponse(CloseableHttpResponse resp) { String respPayload; HttpEntity entity = resp.getEntity(); diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 70865868..671dcca9 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -16,6 +16,10 @@ package com.splunk.kafka.connect; import com.splunk.kafka.connect.VersionUtils; + +import io.confluent.connect.utils.validators.all.ConfigValidation; + +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; @@ -66,4 +70,13 @@ public String version() { public ConfigDef config() { return SplunkSinkConnectorConfig.conf(); } + + @Override + public Config validate(Map connectorConfigs) { + return new ConfigValidation( + config(), + connectorConfigs, + SplunkSinkConnectorConfig::validateKerberosConfigs + ).validate(); + } } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 0299b075..ad053805 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -16,6 +16,10 @@ package com.splunk.kafka.connect; import com.splunk.hecclient.HecConfig; + +import io.confluent.connect.utils.Strings; +import io.confluent.connect.utils.validators.all.ConfigValidationResult; + import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.common.config.AbstractConfig; @@ -36,6 +40,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String INDEX_CONF = "splunk.indexes"; static final String SOURCE_CONF = "splunk.sources"; static final String SOURCETYPE_CONF = "splunk.sourcetypes"; + // Kerberos config + static final String KERBEROS_PRINCIPAL_CONF = "kerb.principal"; + static final String KERBEROS_USER_CONF = "kerb.user"; + static final String KERBEROS_KEYTAB_LOCATION_CONF = "kerb.keytab.location"; static final String TOTAL_HEC_CHANNEL_CONF = "splunk.hec.total.channels"; static final String MAX_HTTP_CONNECTION_PER_CHANNEL_CONF = "splunk.hec.max.http.connection.per.channel"; @@ -161,6 +169,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { static final String HEADER_SOURCETYPE_DOC = "Header to use for Splunk Header Sourcetype"; static final String HEADER_HOST_DOC = "Header to use for Splunk Header Host"; + static final String KERBEROS_PRINCIPAL_DOC = "Kerberos principal"; + static final String KERBEROS_USER_DOC = "Kerberos user"; + static final String KERBEROS_KEYTAB_LOCATION_DOC = "Kerberos keytab"; + final String splunkToken; final String splunkURI; final Map> topicMetas; @@ -203,6 +215,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String headerSourcetype; final String headerHost; + final String kerberosPrincipal; + final String kerberosUser; + final String kerberosKeytabLocation; + SplunkSinkConnectorConfig(Map taskConfig) { super(conf(), taskConfig); splunkToken = getPassword(TOKEN_CONF).value(); @@ -239,6 +255,9 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { headerSource = getString(HEADER_SOURCE_CONF); headerSourcetype = getString(HEADER_SOURCETYPE_CONF); headerHost = getString(HEADER_HOST_CONF); + kerberosPrincipal = getString(KERBEROS_PRINCIPAL_CONF); + kerberosUser = getString(KERBEROS_USER_CONF); + kerberosKeytabLocation = getString(KERBEROS_KEYTAB_LOCATION_CONF); } public static ConfigDef conf() { @@ -274,7 +293,10 @@ public static ConfigDef conf() { .define(HEADER_INDEX_CONF, ConfigDef.Type.STRING, "splunk.header.index", ConfigDef.Importance.MEDIUM, HEADER_INDEX_DOC) .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) - .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC); + .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC) + .define(KERBEROS_PRINCIPAL_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_PRINCIPAL_DOC) + .define(KERBEROS_USER_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_USER_DOC) + .define(KERBEROS_KEYTAB_LOCATION_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, KERBEROS_KEYTAB_LOCATION_DOC); } /** Configuration Method to setup all settings related to Splunk HEC Client @@ -292,7 +314,10 @@ public HecConfig getHecConfig() { .setEnableChannelTracking(trackData) .setTrustStorePath(trustStorePath) .setTrustStorePassword(trustStorePassword) - .setHasCustomTrustStore(hasTrustStorePath); + .setHasCustomTrustStore(hasTrustStorePath) + .setKerberosPrincipal(kerberosPrincipal) + .setKerberosUser(kerberosUser) + .setKerberosKeytabLocation(kerberosKeytabLocation); return config; } @@ -405,4 +430,38 @@ private Map> initMetaMap(Map taskCon } return metaMap; } + + protected static void validateKerberosConfigs(Map configs, ConfigValidationResult result) { + String kerberosKeytabLocation = + (String) configs.get(KERBEROS_KEYTAB_LOCATION_CONF); + String kerberosUser = + (String) configs.get(KERBEROS_USER_CONF); + String kerberosPrincipal = (String) configs.get(KERBEROS_PRINCIPAL_CONF); + + if (Strings.isNotEmpty(kerberosKeytabLocation) + && Strings.isNotEmpty(kerberosPrincipal) + && Strings.isNotEmpty(kerberosUser) + ) { + return; + } + + if (kerberosKeytabLocation.isEmpty() + && kerberosPrincipal.isEmpty() + && kerberosUser.isEmpty() + ) { + return; + } + + result.recordErrors( + String.format( + "%s, %s and %s are required to be configured for Kerberos authentication. ", + KERBEROS_USER_CONF, + KERBEROS_PRINCIPAL_CONF, + KERBEROS_KEYTAB_LOCATION_CONF + ), + KERBEROS_USER_CONF, + KERBEROS_PRINCIPAL_CONF, + KERBEROS_KEYTAB_LOCATION_CONF + ); + } } diff --git a/src/test/java/com/splunk/hecclient/IndexerTest.java b/src/test/java/com/splunk/hecclient/IndexerTest.java index f9ef3f72..883e2727 100644 --- a/src/test/java/com/splunk/hecclient/IndexerTest.java +++ b/src/test/java/com/splunk/hecclient/IndexerTest.java @@ -15,6 +15,8 @@ */ package com.splunk.hecclient; +import java.util.Collections; + import org.apache.http.Header; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; @@ -23,10 +25,13 @@ public class IndexerTest { private static final String baseUrl = "https://localhost:8088"; private static final String token = "mytoken"; + private static final HecConfig hecConfig = + new HecConfig(Collections.emptyList(), "") + .setKerberosPrincipal(""); @Test public void getHeaders() { - Indexer indexer = new Indexer(baseUrl, token, null, null); + Indexer indexer = new Indexer(baseUrl, null, null, hecConfig); Header[] headers = indexer.getHeaders(); Assert.assertEquals(3, headers.length); @@ -63,7 +68,7 @@ public void getHeaders() { @Test public void getterSetter() { - Indexer indexer = new Indexer(baseUrl, token, null, null); + Indexer indexer = new Indexer(baseUrl, null, null,hecConfig); Assert.assertEquals(baseUrl, indexer.getBaseUrl()); Assert.assertEquals(token, indexer.getToken()); @@ -74,7 +79,7 @@ public void getterSetter() { @Test public void toStr() { - Indexer indexer = new Indexer(baseUrl, token, null, null); + Indexer indexer = new Indexer(baseUrl, null, null, hecConfig); Assert.assertEquals(baseUrl, indexer.toString()); } @@ -87,7 +92,7 @@ public void sendWithSuccess() { } PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller); + Indexer indexer = new Indexer(baseUrl, client, poller, hecConfig); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertTrue(result); @@ -139,7 +144,7 @@ public void sendWithReadError() { private Indexer assertFailure(CloseableHttpClient client) { PollerMock poller = new PollerMock(); - Indexer indexer = new Indexer(baseUrl, token, client, poller); + Indexer indexer = new Indexer(baseUrl, client, poller, hecConfig); EventBatch batch = UnitUtil.createBatch(); boolean result = indexer.send(batch); Assert.assertFalse(result); diff --git a/src/test/java/com/splunk/hecclient/UnitUtil.java b/src/test/java/com/splunk/hecclient/UnitUtil.java index eff9181f..287c003f 100644 --- a/src/test/java/com/splunk/hecclient/UnitUtil.java +++ b/src/test/java/com/splunk/hecclient/UnitUtil.java @@ -24,7 +24,8 @@ public class UnitUtil { public static HecConfig createHecConfig() { - return new HecConfig(Arrays.asList("https://dummyhost:8088"), "token"); + return new HecConfig(Arrays.asList("https://dummyhost:8088"), "token") + .setKerberosPrincipal(""); } public static EventBatch createBatch() {