Skip to content

Commit 0757184

Browse files
authored
Merge pull request #328 from splunk/kerberos
Add kerberos authentication support
2 parents 9006a23 + f88b270 commit 0757184

File tree

10 files changed

+320
-20
lines changed

10 files changed

+320
-20
lines changed

README.md

+8
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ Use the below schema to configure Splunk Connect for Kafka
127127
"splunk.hec.json.event.formatted": "<true|false>",
128128
"splunk.hec.ssl.trust.store.path": "<Java KeyStore location>",
129129
"splunk.hec.ssl.trust.store.password": "<Java KeyStore password>"
130+
"kerberos.user.principal": "<The Kerberos user principal the connector may use to authenticate with Kerberos>",
131+
"kerberos.keytab.path": "<The path to the keytab file to use for authentication with Kerberos>"
130132
}
131133
}
132134
```
@@ -198,6 +200,12 @@ Use the below schema to configure Splunk Connect for Kafka
198200
| `splunk.header.sourcetype` | This setting specifies the Kafka record header key which will determine the sourcetype value for the Splunk event. This setting is only applicable when `splunk.header.support` is set to `true`. | `splunk.header.sourcetype` |
199201
| `splunk.header.host` | This setting specifies the Kafka record header key which will determine the host value for the Splunk event. This setting is only applicable when `splunk.header.support` is set to `true`. | `splunk.header.host` |
200202

203+
### Kerberos Parameters
204+
| Name | Description | Default Value |
205+
|-------- |----------------------------|-----------------------|
206+
| `kerberos.user.principal` | The Kerberos user principal the connector may use to authenticate with Kerberos. | `""` |
207+
| `kerberos.keytab.path` | The path to the keytab file to use for authentication with Kerberos. | `""` |
208+
201209
## Load balancing
202210

203211
See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment.

src/main/java/com/splunk/hecclient/Hec.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import javax.net.ssl.SSLContext;
3333
import javax.net.ssl.TrustManagerFactory;
34+
import org.apache.kafka.connect.errors.ConnectException;
3435

3536
/**
3637
* Hec is the central class which will construct the HTTP Event Collector Client to send messages to Splunk.
@@ -196,7 +197,7 @@ public static HecAckPoller createPoller(HecConfig config, PollerCallback callbac
196197
public Hec(HecConfig config, CloseableHttpClient httpClient, Poller poller, LoadBalancerInf loadBalancer) {
197198
for (int i = 0; i < config.getTotalChannels(); ) {
198199
for (String uri : config.getUris()) {
199-
Indexer indexer = new Indexer(uri, config.getToken(), httpClient, poller);
200+
Indexer indexer = new Indexer(uri, httpClient, poller, config);
200201
indexer.setKeepAlive(config.getHttpKeepAlive());
201202
indexer.setBackPressureThreshold(config.getBackoffThresholdSeconds());
202203
loadBalancer.add(uri, indexer.getChannel().setTracking(config.getEnableChannelTracking()));
@@ -266,6 +267,14 @@ public final void close() {
266267
public static CloseableHttpClient createHttpClient(final HecConfig config) {
267268
int poolSizePerDest = config.getMaxHttpConnectionPerChannel();
268269

270+
if (config.kerberosAuthEnabled()) {
271+
try {
272+
return new HttpClientBuilder().buildKerberosClient();
273+
} catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException ex) {
274+
throw new ConnectException("Unable to build Kerberos Client", ex);
275+
}
276+
}
277+
269278
// Code block for default client construction
270279
if(!config.getHasCustomTrustStore() &&
271280
StringUtils.isBlank(config.getTrustStorePath()) &&

src/main/java/com/splunk/hecclient/HecConfig.java

+24
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public final class HecConfig {
3636
private String trustStorePath;
3737
private String trustStorePassword;
3838
private int lbPollInterval = 120; // in seconds
39+
private String kerberosPrincipal;
40+
private String kerberosKeytabPath;
3941

4042
public HecConfig(List<String> uris, String token) {
4143
this.uris = uris;
@@ -178,4 +180,26 @@ public HecConfig setBackoffThresholdSeconds(int backoffSeconds) {
178180
backoffThresholdSeconds = backoffSeconds * 1000;
179181
return this;
180182
}
183+
184+
public String kerberosPrincipal() {
185+
return kerberosPrincipal;
186+
}
187+
188+
public HecConfig setKerberosPrincipal(String kerberosPrincipal) {
189+
this.kerberosPrincipal = kerberosPrincipal;
190+
return this;
191+
}
192+
193+
public String kerberosKeytabLocation() {
194+
return kerberosKeytabPath;
195+
}
196+
197+
public HecConfig setKerberosKeytabPath(String kerberosKeytabPath) {
198+
this.kerberosKeytabPath = kerberosKeytabPath;
199+
return this;
200+
}
201+
202+
public boolean kerberosAuthEnabled() {
203+
return !kerberosPrincipal().isEmpty();
204+
}
181205
}

src/main/java/com/splunk/hecclient/HttpClientBuilder.java

+45
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@
1515
*/
1616
package com.splunk.hecclient;
1717

18+
import java.security.KeyManagementException;
19+
import java.security.KeyStoreException;
20+
import java.security.NoSuchAlgorithmException;
21+
import java.security.Principal;
22+
import org.apache.http.auth.AuthSchemeProvider;
23+
import org.apache.http.auth.AuthScope;
24+
import org.apache.http.auth.Credentials;
25+
import org.apache.http.client.config.AuthSchemes;
1826
import org.apache.http.client.config.CookieSpecs;
1927
import org.apache.http.client.config.RequestConfig;
28+
import org.apache.http.config.Lookup;
29+
import org.apache.http.config.RegistryBuilder;
2030
import org.apache.http.config.SocketConfig;
31+
import org.apache.http.conn.ssl.NoopHostnameVerifier;
2132
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
2233
import org.apache.http.conn.ssl.TrustStrategy;
34+
import org.apache.http.impl.auth.SPNegoSchemeFactory;
35+
import org.apache.http.impl.client.BasicCredentialsProvider;
2336
import org.apache.http.impl.client.CloseableHttpClient;
2437
import org.apache.http.impl.client.HttpClients;
2538
import org.apache.http.ssl.SSLContextBuilder;
@@ -29,6 +42,8 @@
2942
import javax.net.ssl.SSLSession;
3043
import java.security.cert.X509Certificate;
3144

45+
46+
3247
public final class HttpClientBuilder {
3348
private int maxConnectionPoolSizePerDestination = 4;
3449
private int maxConnectionPoolSize = 4 * 2;
@@ -87,6 +102,36 @@ public CloseableHttpClient build() {
87102
.build();
88103
}
89104

105+
public CloseableHttpClient buildKerberosClient() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
106+
org.apache.http.impl.client.HttpClientBuilder builder =
107+
org.apache.http.impl.client.HttpClientBuilder.create();
108+
Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create().
109+
register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build();
110+
builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
111+
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
112+
credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() {
113+
@Override
114+
public Principal getUserPrincipal() {
115+
return null;
116+
}
117+
@Override
118+
public String getPassword() {
119+
return null;
120+
}
121+
});
122+
builder.setDefaultCredentialsProvider(credentialsProvider);
123+
SSLContextBuilder sslContextBuilderbuilder = new SSLContextBuilder();
124+
sslContextBuilderbuilder.loadTrustMaterial(null, (chain, authType) -> true);
125+
SSLConnectionSocketFactory sslsf = new
126+
SSLConnectionSocketFactory(
127+
sslContextBuilderbuilder.build(), NoopHostnameVerifier.INSTANCE);
128+
129+
builder.setSSLSocketFactory(sslsf);
130+
CloseableHttpClient httpClient = builder.build();
131+
return httpClient;
132+
}
133+
134+
90135
private SSLConnectionSocketFactory getSSLConnectionFactory() {
91136
if (disableSSLCertVerification) {
92137
return getUnsecureSSLConnectionSocketFactory();

src/main/java/com/splunk/hecclient/Indexer.java

+81-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,18 @@
1919
import com.fasterxml.jackson.databind.ObjectMapper;
2020
import com.fasterxml.jackson.databind.node.ObjectNode;
2121
import com.splunk.kafka.connect.VersionUtils;
22+
import com.sun.security.auth.module.Krb5LoginModule;
23+
import java.security.Principal;
24+
import java.security.PrivilegedAction;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import javax.security.auth.Subject;
30+
import javax.security.auth.kerberos.KerberosPrincipal;
31+
import javax.security.auth.login.AppConfigurationEntry;
32+
import javax.security.auth.login.Configuration;
33+
import javax.security.auth.login.LoginContext;
2234
import org.apache.http.Header;
2335
import org.apache.http.HttpEntity;
2436
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -38,6 +50,8 @@ final class Indexer implements IndexerInf {
3850
private static final Logger log = LoggerFactory.getLogger(Indexer.class);
3951
private static final ObjectMapper jsonMapper = new ObjectMapper();
4052

53+
private HecConfig hecConfig;
54+
private Configuration config;
4155
private CloseableHttpClient httpClient;
4256
private HttpContext context;
4357
private String baseUrl;
@@ -51,10 +65,12 @@ final class Indexer implements IndexerInf {
5165
private long backPressureThreshold = 60 * 1000; // 1 min
5266

5367
// Indexer doesn't own client, ack poller
54-
public Indexer(String baseUrl, String hecToken, CloseableHttpClient client, Poller poller) {
68+
public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig config) {
5569
this.httpClient = client;
5670
this.baseUrl = baseUrl;
5771
this.hecToken = hecToken;
72+
this.hecConfig = config;
73+
this.hecToken = config.getToken();
5874
this.poller = poller;
5975
this.context = HttpClientContext.create();
6076
backPressure = 0;
@@ -147,17 +163,73 @@ public boolean send(final EventBatch batch) {
147163
@Override
148164
public synchronized String executeHttpRequest(final HttpUriRequest req) {
149165
CloseableHttpResponse resp;
150-
try {
151-
resp = httpClient.execute(req, context);
152-
} catch (Exception ex) {
153-
logBackPressure();
154-
log.error("encountered io exception", ex);
155-
throw new HecException("encountered exception when post data", ex);
166+
if (hecConfig.kerberosAuthEnabled()) {
167+
if (config == null) {
168+
defineKerberosConfigs();
169+
}
170+
Set<Principal> principals = new HashSet<>(1);
171+
principals.add(new KerberosPrincipal(hecConfig.kerberosPrincipal()));
172+
Subject subject = new Subject(false, principals, new HashSet<>(), new HashSet<>());
173+
try {
174+
LoginContext lc = new LoginContext("SplunkSinkConnector", subject, null, config);
175+
lc.login();
176+
Subject serviceSubject = lc.getSubject();
177+
resp = Subject.doAs(serviceSubject, (PrivilegedAction<CloseableHttpResponse>) () -> {
178+
try {
179+
return httpClient.execute(req, context);
180+
} catch (IOException ex) {
181+
logBackPressure();
182+
throw new HecException("Encountered exception while posting data.", ex);
183+
}
184+
});
185+
} catch (Exception le) {
186+
throw new HecException(
187+
"Encountered exception while authenticating via Kerberos.", le);
188+
}
189+
} else {
190+
try {
191+
resp = httpClient.execute(req, context);
192+
} catch (Exception ex) {
193+
logBackPressure();
194+
throw new HecException("encountered exception when post data", ex);
195+
}
156196
}
157197

158198
return readAndCloseResponse(resp);
159199
}
160200

201+
202+
/**
203+
* Creates the Kerberos configurations.
204+
*
205+
* @return map of kerberos configs
206+
*/
207+
private Map<String, Object> kerberosConfigMap() {
208+
Map<String, Object> configs = new HashMap<>();
209+
configs.put("useTicketCache", "true");
210+
configs.put("renewTGT", "true");
211+
configs.put("useKeyTab", "true");
212+
configs.put("keyTab", hecConfig.kerberosKeytabLocation());
213+
configs.put("refreshKrb5Config", "true");
214+
configs.put("principal", hecConfig.kerberosPrincipal());
215+
configs.put("storeKey", "false");
216+
configs.put("doNotPrompt", "true");
217+
return configs;
218+
}
219+
220+
private void defineKerberosConfigs() {
221+
config = new Configuration() {
222+
@Override
223+
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
224+
return new AppConfigurationEntry[]{
225+
new AppConfigurationEntry(Krb5LoginModule.class.getName(),
226+
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, kerberosConfigMap())
227+
};
228+
}
229+
};
230+
}
231+
232+
161233
private String readAndCloseResponse(CloseableHttpResponse resp) {
162234
String respPayload;
163235
HttpEntity entity = resp.getEntity();
@@ -181,6 +253,8 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {
181253
poller.setStickySessionToTrue();
182254
}
183255

256+
257+
184258
int status = resp.getStatusLine().getStatusCode();
185259
// FIXME 503 server is busy backpressure
186260
if (status != 200 && status != 201) {

src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java

+46-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@
1515
*/
1616
package com.splunk.kafka.connect;
1717

18-
import com.splunk.kafka.connect.VersionUtils;
18+
import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONF;
19+
import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_USER_PRINCIPAL_CONF;
20+
21+
import java.util.function.Function;
22+
import java.util.stream.Collectors;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.kafka.common.config.Config;
1925
import org.apache.kafka.common.config.ConfigDef;
26+
import org.apache.kafka.common.config.ConfigValue;
2027
import org.apache.kafka.connect.connector.Task;
2128
import org.apache.kafka.connect.sink.SinkConnector;
2229

@@ -30,6 +37,8 @@
3037
public final class SplunkSinkConnector extends SinkConnector {
3138
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
3239
private Map<String, String> taskConfig;
40+
private Map<String, ConfigValue> values;
41+
private List<ConfigValue> validations;
3342

3443
@Override
3544
public void start(Map<String, String> taskConfig) {
@@ -66,4 +75,40 @@ public String version() {
6675
public ConfigDef config() {
6776
return SplunkSinkConnectorConfig.conf();
6877
}
78+
79+
80+
@Override
81+
public Config validate(final Map<String, String> connectorConfigs) {
82+
Config config = super.validate(connectorConfigs);
83+
validations = config.configValues();
84+
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));
85+
86+
validateKerberosConfigs(connectorConfigs);
87+
return new Config(validations);
88+
}
89+
90+
void validateKerberosConfigs(final Map<String, String> configs) {
91+
final String keytab = configs.getOrDefault(KERBEROS_KEYTAB_PATH_CONF, "");
92+
final String principal = configs.getOrDefault(KERBEROS_USER_PRINCIPAL_CONF, "");
93+
94+
if (StringUtils.isNotEmpty(keytab) && StringUtils.isNotEmpty(principal)) {
95+
return;
96+
}
97+
98+
if (keytab.isEmpty() && principal.isEmpty()) {
99+
return;
100+
}
101+
102+
String errorMessage = String.format(
103+
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
104+
KERBEROS_KEYTAB_PATH_CONF,
105+
KERBEROS_USER_PRINCIPAL_CONF
106+
);
107+
addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage);
108+
addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage);
109+
}
110+
111+
private void addErrorMessage(String property, String error) {
112+
values.get(property).addErrorMessage(error);
113+
}
69114
}

0 commit comments

Comments
 (0)