Skip to content

Commit

Permalink
[elasticsearch] support token based auth (#46)
Browse files Browse the repository at this point in the history
* [elasticsearch] support token based auth
  • Loading branch information
nicoloboschi authored Mar 23, 2022
1 parent 1028aa7 commit cf13b39
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ public class ElasticSearchConfig implements Serializable {
)
private String password;

@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "The token used by the connector to connect to the elastic search cluster. Only one between basic/token/apiKey authentication mode must be configured."
)
private String token;

@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "The apiKey used by the connector to connect to the elastic search cluster. Only one between basic/token/apiKey authentication mode must be configured."
)
private String apiKey;

@FieldDoc(
required = false,
defaultValue = "-1",
Expand Down Expand Up @@ -320,10 +336,21 @@ public void validate() {
}

if ((StringUtils.isNotEmpty(username) && StringUtils.isEmpty(password))
|| (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password))) {
|| (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password))) {
throw new IllegalArgumentException("Values for both Username & password are required.");
}

boolean basicAuthSet = StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password);
boolean tokenAuthSet = StringUtils.isNotEmpty(token);
boolean apiKeySet = StringUtils.isNotEmpty(apiKey);
if ((basicAuthSet && tokenAuthSet && apiKeySet) ||
(basicAuthSet && tokenAuthSet) ||
(basicAuthSet && apiKeySet) ||
(tokenAuthSet && apiKeySet)
) {
throw new IllegalArgumentException("Only one between basic/token/apiKey authentication mode must be configured.");
}

if (indexNumberOfShards <= 0) {
throw new IllegalArgumentException("indexNumberOfShards must be a strictly positive integer.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
Expand All @@ -32,6 +34,7 @@
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
Expand All @@ -56,6 +59,9 @@
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -98,10 +104,12 @@ public class ConfigCallback implements RestClientBuilder.HttpClientConfigCallbac
org.opensearch.client.RestClientBuilder.HttpClientConfigCallback {
final NHttpClientConnectionManager connectionManager;
final CredentialsProvider credentialsProvider;
final List<Header> defaultHeaders;

public ConfigCallback() {
this.connectionManager = buildConnectionManager(RestClient.this.config);
this.credentialsProvider = buildCredentialsProvider(RestClient.this.config);
this.defaultHeaders = buildDefaultHeaders(RestClient.this.config);
}

@Override
Expand All @@ -113,6 +121,9 @@ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder builder
if (this.credentialsProvider != null) {
builder.setDefaultCredentialsProvider(credentialsProvider);
}
if (defaultHeaders != null) {
builder.setDefaultHeaders(defaultHeaders);
}
return builder;
}

Expand Down Expand Up @@ -184,6 +195,21 @@ private CredentialsProvider buildCredentialsProvider(ElasticSearchConfig config)
new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
return credentialsProvider;
}

private List<Header> buildDefaultHeaders(ElasticSearchConfig config) {
if (StringUtils.isEmpty(config.getToken()) && StringUtils.isEmpty(config.getApiKey())) {
return null;
}
List<Header> headers = new ArrayList<>();
String authHeaderValue;
if (!StringUtils.isEmpty(config.getToken())) {
authHeaderValue = "Bearer " + config.getToken();
} else {
authHeaderValue = "ApiKey " + config.getApiKey();
}
headers.add(new BasicHeader(HttpHeaders.AUTHORIZATION, authHeaderValue));
return Collections.unmodifiableList(headers);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public void close() {
log.warn("Elasticsearch bulk processor close error:", e);
}
client.shutdown();
}

@VisibleForTesting
public ElasticsearchClient getClient() {
return client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.api.Record;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

@Slf4j
public abstract class ElasticSearchAuthTests extends ElasticSearchTestBase {
public static final String ELASTICPWD = "elastic";

static ElasticsearchContainer container;
public ElasticSearchAuthTests(String elasticImageName) {
super(elasticImageName);
}

@BeforeMethod(alwaysRun = true)
public void initBeforeClass() throws IOException {
if (container != null) {
return;
}
container = createElasticsearchContainer()
.withEnv("xpack.security.enabled", "true")
.withEnv("xpack.security.authc.token.enabled", "true")
.withEnv("xpack.security.authc.api_key.enabled", "true")
.withEnv("xpack.license.self_generated.type", "trial")
.withPassword(ELASTICPWD);
container.start();
}

@AfterClass(alwaysRun = true)
public static void closeAfterClass() {
if (container != null) {
container.close();
}

}

@Test
public void testBasicAuth() throws Exception {
final String indexName = "my-index-" + UUID.randomUUID().toString();
ElasticSearchConfig config = new ElasticSearchConfig();
config.setElasticSearchUrl("http://" + container.getHttpHostAddress());
config.setCompatibilityMode(getCompatibilityMode());
config.setUsername("elastic");
config.setIndexName(indexName);
config.setMaxRetries(1);
config.setBulkEnabled(true);
// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setPassword(ELASTICPWD);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
ensureCalls(client, indexName);
}
}

@Test
public void testTokenAuth() throws Exception {
final String indexName = "my-index-" + UUID.randomUUID().toString();
ElasticSearchConfig config = new ElasticSearchConfig();
config.setElasticSearchUrl("http://" + container.getHttpHostAddress());
config.setCompatibilityMode(getCompatibilityMode());
config.setUsername("elastic");
config.setIndexName(indexName);
config.setMaxRetries(1);
config.setBulkEnabled(true);


config.setPassword(ELASTICPWD);
String token;
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
token = createAuthToken(client, "elastic", ELASTICPWD);
}

config.setUsername(null);
config.setPassword(null);

// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setToken(token);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
ensureCalls(client, indexName);
}
}

@Test
public void testApiKey() throws Exception {
final String indexName = "my-index-" + UUID.randomUUID().toString();
ElasticSearchConfig config = new ElasticSearchConfig();
config.setElasticSearchUrl("http://" + container.getHttpHostAddress());
config.setCompatibilityMode(getCompatibilityMode());
config.setUsername("elastic");
config.setIndexName(indexName);
config.setMaxRetries(1);
config.setBulkEnabled(true);

config.setPassword(ELASTICPWD);
String apiKey;
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
apiKey = createApiKey(client);
}

config.setUsername(null);
config.setPassword(null);

// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setApiKey(apiKey);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
ensureCalls(client, indexName);
}
}

@SneakyThrows
private void ensureCalls(ElasticSearchClient client, String indexName) {
AtomicInteger ackCount = new AtomicInteger();
assertTrue(client.createIndexIfNeeded(indexName));
Record mockRecord = mock(Record.class);
doAnswer(invocation -> ackCount.incrementAndGet()).when(mockRecord).ack();
assertTrue(client.indexDocument(mockRecord, Pair.of("1", "{\"a\":1}")));
assertTrue(client.deleteDocument(mockRecord, "1"));
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkDelete(mockRecord, "1");
client.flush();
assertEquals(ackCount.get(), 4);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public final void defaultValueTest() throws IOException {
assertEquals(config.getTypeName(), "_doc");
assertNull(config.getUsername());
assertNull(config.getPassword());
assertNull(config.getToken());
assertNull(config.getApiKey());
assertEquals(config.getIndexNumberOfReplicas(), 0);
assertEquals(config.getIndexNumberOfShards(), 1);

Expand Down Expand Up @@ -194,6 +196,48 @@ public final void passwordCredentialsTest() throws IOException {
config.validate();
}

@Test
public final void credentialsTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:90902");
map.put("indexName", "myindex");
map.put("username", "elastic");
map.put("password", "go-speedie-go");
map.put("token", "tok");
{
ElasticSearchConfig config = ElasticSearchConfig.load(map);
expectThrows(IllegalArgumentException.class, () -> config.validate());
}
map.put("apiKey", "apiKey");
{
ElasticSearchConfig config = ElasticSearchConfig.load(map);
expectThrows(IllegalArgumentException.class, () -> config.validate());
}
map.remove("token");
{
ElasticSearchConfig config = ElasticSearchConfig.load(map);
expectThrows(IllegalArgumentException.class, () -> config.validate());
}
map.remove("username");
map.remove("password");
{
ElasticSearchConfig config = ElasticSearchConfig.load(map);
config.validate();
}
map.put("token", "tok");
map.remove("apiKey");
{
ElasticSearchConfig config = ElasticSearchConfig.load(map);
config.validate();
}
map.remove("token");

{
ElasticSearchConfig config = ElasticSearchConfig.load(map);
config.validate();
}
}

@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "connectTimeoutInMs must be a positive integer.")
public final void connectTimeoutInMsTest() throws IOException {
Expand Down
Loading

0 comments on commit cf13b39

Please sign in to comment.