Skip to content

Support skip verify ssl certificate and hostname #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class ElasticsearchSinkBuilderBase<
private String password;
private String connectionPathPrefix;
private Integer connectionTimeout;
private Boolean connectionSkipVerifySsl;
private Integer connectionRequestTimeout;
private Integer socketTimeout;
private FailureHandler failureHandler = new DefaultFailureHandler();
Expand Down Expand Up @@ -250,6 +251,11 @@ public B setConnectionTimeout(int timeout) {
return self();
}

public B setConnectionSkipVerifySsl(boolean skip) {
this.connectionSkipVerifySsl = skip;
return self();
}

/**
* Sets the timeout for waiting for data or, put differently, a maximum period inactivity
* between two consecutive data packets.
Expand Down Expand Up @@ -336,6 +342,7 @@ private NetworkClientConfig buildNetworkClientConfig() {
connectionPathPrefix,
connectionRequestTimeout,
connectionTimeout,
connectionSkipVerifySsl,
socketTimeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -47,6 +50,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;

import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
Expand Down Expand Up @@ -172,6 +178,24 @@ private static RestClientBuilder configureRestClientBuilder(
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}

if (networkClientConfig.getConnectionSkipVerifySsl()) {
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLHostnameVerifier(
NoopHostnameVerifier.INSTANCE));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
try {
return httpClientBuilder
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setSSLContext(SSLContexts.custom()
.loadTrustMaterial(null, TrustAllStrategy.INSTANCE)
.build());
} catch (NoSuchAlgorithmException | KeyManagementException |
KeyStoreException e) {
throw new RuntimeException(e);
}
});
}

if (networkClientConfig.getConnectionRequestTimeout() != null
|| networkClientConfig.getConnectionTimeout() != null
|| networkClientConfig.getSocketTimeout() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class NetworkClientConfig implements Serializable {
@Nullable private final String connectionPathPrefix;
@Nullable private final Integer connectionRequestTimeout;
@Nullable private final Integer connectionTimeout;
@Nullable private final Boolean connectionSkipVerifySsl;
@Nullable private final Integer socketTimeout;

NetworkClientConfig(
Expand All @@ -37,12 +38,14 @@ class NetworkClientConfig implements Serializable {
@Nullable String connectionPathPrefix,
@Nullable Integer connectionRequestTimeout,
@Nullable Integer connectionTimeout,
@Nullable Boolean connectionSkipVerifySsl,
@Nullable Integer socketTimeout) {
this.username = username;
this.password = password;
this.connectionPathPrefix = connectionPathPrefix;
this.connectionRequestTimeout = connectionRequestTimeout;
this.connectionTimeout = connectionTimeout;
this.connectionSkipVerifySsl = connectionSkipVerifySsl;
this.socketTimeout = socketTimeout;
}

Expand Down Expand Up @@ -71,6 +74,11 @@ public Integer getSocketTimeout() {
return socketTimeout;
}

@Nullable
public Boolean getConnectionSkipVerifySsl() {
return connectionSkipVerifySsl;
}

@Nullable
public String getConnectionPathPrefix() {
return connectionPathPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECT_SKIP_VERIFY_SSL;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
Expand Down Expand Up @@ -116,6 +117,10 @@ public Optional<Duration> getConnectionTimeout() {
return config.getOptional(CONNECTION_TIMEOUT);
}

public Optional<Boolean> getConnectionSkipVerifySsl() {
return config.getOptional(CONNECT_SKIP_VERIFY_SSL);
}

public Optional<Duration> getSocketTimeout() {
return config.getOptional(SOCKET_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,10 @@ public class ElasticsearchConnectorOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");

public static final ConfigOption<Boolean> CONNECT_SKIP_VERIFY_SSL =
ConfigOptions.key("connection.skip-verify-ssl")
.booleanType()
.defaultValue(false)
.withDescription("Skip verify ssl certificate or not");
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
builder.setSocketTimeout((int) config.getSocketTimeout().get().getSeconds());
}

if (config.getConnectionSkipVerifySsl().isPresent()) {
builder.setConnectionSkipVerifySsl(config.getConnectionSkipVerifySsl().get());
}

return SinkV2Provider.of(builder.build(), config.getParallelism().orElse(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public Optional<String> getPathPrefix() {
return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
}

public Optional<Boolean> getSkipVerifySsl() {
return config.getOptional(ElasticsearchConnectorOptions.CONNECT_SKIP_VERIFY_SSL);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public class ElasticsearchConnectorOptions {
.noDefaultValue()
.withDescription("Elasticsearch hosts to connect to.");

public static final ConfigOption<Boolean> CONNECT_SKIP_VERIFY_SSL =
ConfigOptions.key("connection.skip-verify-ssl")
.booleanType()
.defaultValue(false)
.withDescription("Skip verify ssl certificate or not");

public static final ConfigOption<String> INDEX_OPTION =
ConfigOptions.key("index")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private static ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
bulkProcessorConfig,
new TestBulkProcessorBuilderFactory(),
new DefaultBulkResponseInspector(),
new NetworkClientConfig(null, null, null, null, null, null),
new NetworkClientConfig(null, null, null, null, null, null, null),
metricGroup,
new TestMailbox());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand All @@ -46,6 +49,9 @@

import javax.annotation.Nullable;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -167,10 +173,12 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
new AuthRestClientFactory(
config.getPathPrefix().orElse(null),
config.getUsername().get(),
config.getPassword().get()));
config.getPassword().get(), config.getSkipVerifySsl().get()));
} else {
builder.setRestClientFactory(
new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
new DefaultRestClientFactory(
config.getPathPrefix().orElse(null),
config.getSkipVerifySsl().get()));
}

final ElasticsearchSink<RowData> sink = builder.build();
Expand Down Expand Up @@ -198,16 +206,32 @@ public String asSummaryString() {
static class DefaultRestClientFactory implements RestClientFactory {

private final String pathPrefix;
private final Boolean skipVerifySsl;

public DefaultRestClientFactory(@Nullable String pathPrefix) {
public DefaultRestClientFactory(@Nullable String pathPrefix, Boolean skipVerifySsl) {
this.pathPrefix = pathPrefix;
this.skipVerifySsl = skipVerifySsl;
}

@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
if (pathPrefix != null) {
restClientBuilder.setPathPrefix(pathPrefix);
}
if (skipVerifySsl) {
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> {
try {
return httpClientBuilder.setSSLContext(SSLContexts
.custom()
.loadTrustMaterial(null, TrustAllStrategy.INSTANCE)
.build());
} catch (NoSuchAlgorithmException | KeyManagementException |
KeyStoreException e) {
throw new RuntimeException(e);
}
});
}
}

@Override
Expand Down Expand Up @@ -236,12 +260,17 @@ static class AuthRestClientFactory implements RestClientFactory {
private final String username;
private final String password;
private transient CredentialsProvider credentialsProvider;
private final Boolean skipVerifySsl;

public AuthRestClientFactory(
@Nullable String pathPrefix, String username, String password) {
@Nullable String pathPrefix,
String username,
String password,
Boolean skipVerifySsl) {
this.pathPrefix = pathPrefix;
this.password = password;
this.username = username;
this.skipVerifySsl = skipVerifySsl;
}

@Override
Expand All @@ -254,10 +283,28 @@ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
restClientBuilder.setHttpClientConfigCallback(
httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultCredentialsProvider(
credentialsProvider));
if (skipVerifySsl) {
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
try {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setSSLContext(SSLContexts
.custom()
.loadTrustMaterial(null, TrustAllStrategy.INSTANCE)
.build());
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
} catch (KeyManagementException e) {
throw new RuntimeException(e);
} catch (KeyStoreException e) {
throw new RuntimeException(e);
}
});
} else {
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
new Elasticsearch7DynamicSink.AuthRestClientFactory(
config.getPathPrefix().orElse(null),
config.getUsername().get(),
config.getPassword().get());
config.getPassword().get(), config.getSkipVerifySsl().get());
} else {
restClientFactory =
new Elasticsearch7DynamicSink.DefaultRestClientFactory(
config.getPathPrefix().orElse(null));
config.getPathPrefix().orElse(null), config.getSkipVerifySsl().get());
}

Elasticsearch7ApiCallBridge elasticsearch7ApiCallBridge =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECT_SKIP_VERIFY_SSL;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class Elasticsearch7DynamicTableFactory
BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
BULK_FLUSH_BACKOFF_DELAY_OPTION,
CONNECTION_PATH_PREFIX,
CONNECT_SKIP_VERIFY_SSL,
FORMAT_OPTION,
PASSWORD_OPTION,
USERNAME_OPTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testBuilder() {
verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
verify(provider.builderSpy)
.setRestClientFactory(
new Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp"));
new Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp", true));
verify(provider.sinkSpy).disableFlushOnCheckpoint();
}

Expand Down Expand Up @@ -124,7 +124,8 @@ public void testDefaultConfig() {
verify(provider.builderSpy).setBulkFlushMaxActions(1000);
verify(provider.builderSpy).setBulkFlushMaxSizeMb(2);
verify(provider.builderSpy)
.setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory(null));
.setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory(
null, true));
verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
}

Expand Down Expand Up @@ -160,7 +161,7 @@ public void testAuthConfig() {
verify(provider.builderSpy)
.setRestClientFactory(
new Elasticsearch7DynamicSink.AuthRestClientFactory(
null, USERNAME, PASSWORD));
null, USERNAME, PASSWORD, true));
verify(provider.sinkSpy, never()).disableFlushOnCheckpoint();
}

Expand Down