diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java index 2904eff0..4c680ec5 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java @@ -58,6 +58,7 @@ public abstract class ElasticsearchSinkBuilderBase< private String password; private String connectionPathPrefix; private Integer connectionTimeout; + private Boolean connectionSkipVerifySsl; private Integer connectionRequestTimeout; private Integer socketTimeout; private FailureHandler failureHandler = new DefaultFailureHandler(); @@ -250,6 +251,11 @@ public B setConnectionTimeout(int timeout) { return self(); } + public B setConnectionSkipVerifySsl(boolean skip) { + this.connectionSkipVerifySsl = skip; + return self(); + } + /** * Sets the timeout for waiting for data or, put differently, a maximum period inactivity * between two consecutive data packets. @@ -336,6 +342,7 @@ private NetworkClientConfig buildNetworkClientConfig() { connectionPathPrefix, connectionRequestTimeout, connectionTimeout, + connectionSkipVerifySsl, socketTimeout); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java index 8f84d873..76524f74 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java @@ -30,7 +30,10 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContexts; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; @@ -47,6 +50,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.List; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -172,6 +178,24 @@ private static RestClientBuilder configureRestClientBuilder( httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } + + if (networkClientConfig.getConnectionSkipVerifySsl()) { + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLHostnameVerifier( + NoopHostnameVerifier.INSTANCE)); + builder.setHttpClientConfigCallback(httpClientBuilder -> { + try { + return httpClientBuilder + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .setSSLContext(SSLContexts.custom() + .loadTrustMaterial(null, TrustAllStrategy.INSTANCE) + .build()); + } catch (NoSuchAlgorithmException | KeyManagementException | + KeyStoreException e) { + throw new RuntimeException(e); + } + }); + } + if (networkClientConfig.getConnectionRequestTimeout() != null || networkClientConfig.getConnectionTimeout() != null || networkClientConfig.getSocketTimeout() != null) { diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java index 5ae05108..7abe93b0 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java @@ -29,6 +29,7 @@ class NetworkClientConfig implements Serializable { @Nullable private final String connectionPathPrefix; @Nullable private final Integer connectionRequestTimeout; @Nullable private final Integer connectionTimeout; + @Nullable private final Boolean connectionSkipVerifySsl; @Nullable private final Integer socketTimeout; NetworkClientConfig( @@ -37,12 +38,14 @@ class NetworkClientConfig implements Serializable { @Nullable String connectionPathPrefix, @Nullable Integer connectionRequestTimeout, @Nullable Integer connectionTimeout, + @Nullable Boolean connectionSkipVerifySsl, @Nullable Integer socketTimeout) { this.username = username; this.password = password; this.connectionPathPrefix = connectionPathPrefix; this.connectionRequestTimeout = connectionRequestTimeout; this.connectionTimeout = connectionTimeout; + this.connectionSkipVerifySsl = connectionSkipVerifySsl; this.socketTimeout = socketTimeout; } @@ -71,6 +74,11 @@ public Integer getSocketTimeout() { return socketTimeout; } + @Nullable + public Boolean getConnectionSkipVerifySsl() { + return connectionSkipVerifySsl; + } + @Nullable public String getConnectionPathPrefix() { return connectionPathPrefix; diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index 578b1b32..c221f7ba 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -41,6 +41,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECT_SKIP_VERIFY_SSL; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; @@ -116,6 +117,10 @@ public Optional getConnectionTimeout() { return config.getOptional(CONNECTION_TIMEOUT); } + public Optional getConnectionSkipVerifySsl() { + return config.getOptional(CONNECT_SKIP_VERIFY_SSL); + } + public Optional getSocketTimeout() { return config.getOptional(SOCKET_TIMEOUT); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index 10ea0ae2..d08643f3 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -145,4 +145,10 @@ public class ElasticsearchConnectorOptions { .enumType(DeliveryGuarantee.class) .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE) .withDescription("Optional delivery guarantee when committing."); + + public static final ConfigOption CONNECT_SKIP_VERIFY_SSL = + ConfigOptions.key("connection.skip-verify-ssl") + .booleanType() + .defaultValue(false) + .withDescription("Skip verify ssl certificate or not"); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index 0fd389bd..66186f2b 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -174,6 +174,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { builder.setSocketTimeout((int) config.getSocketTimeout().get().getSeconds()); } + if (config.getConnectionSkipVerifySsl().isPresent()) { + builder.setConnectionSkipVerifySsl(config.getConnectionSkipVerifySsl().get()); + } + return SinkV2Provider.of(builder.build(), config.getParallelism().orElse(null)); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index 04c76333..4d5ebe2a 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -150,6 +150,10 @@ public Optional getPathPrefix() { return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX); } + public Optional getSkipVerifySsl() { + return config.getOptional(ElasticsearchConnectorOptions.CONNECT_SKIP_VERIFY_SSL); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java index 4838b035..f6907ca7 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -46,6 +46,12 @@ public class ElasticsearchConnectorOptions { .noDefaultValue() .withDescription("Elasticsearch hosts to connect to."); + public static final ConfigOption CONNECT_SKIP_VERIFY_SSL = + ConfigOptions.key("connection.skip-verify-ssl") + .booleanType() + .defaultValue(false) + .withDescription("Skip verify ssl certificate or not"); + public static final ConfigOption INDEX_OPTION = ConfigOptions.key("index") .stringType() diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index 0e12e976..d2991b90 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -327,7 +327,7 @@ private static ElasticsearchWriter> createWriter( bulkProcessorConfig, new TestBulkProcessorBuilderFactory(), new DefaultBulkResponseInspector(), - new NetworkClientConfig(null, null, null, null, null, null), + new NetworkClientConfig(null, null, null, null, null, null, null), metricGroup, new TestMailbox()); } diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java index 1926e445..1ab86eca 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -37,7 +37,10 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContexts; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -46,6 +49,9 @@ import javax.annotation.Nullable; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.time.ZoneId; import java.util.List; import java.util.Objects; @@ -167,10 +173,12 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { new AuthRestClientFactory( config.getPathPrefix().orElse(null), config.getUsername().get(), - config.getPassword().get())); + config.getPassword().get(), config.getSkipVerifySsl().get())); } else { builder.setRestClientFactory( - new DefaultRestClientFactory(config.getPathPrefix().orElse(null))); + new DefaultRestClientFactory( + config.getPathPrefix().orElse(null), + config.getSkipVerifySsl().get())); } final ElasticsearchSink sink = builder.build(); @@ -198,9 +206,11 @@ public String asSummaryString() { static class DefaultRestClientFactory implements RestClientFactory { private final String pathPrefix; + private final Boolean skipVerifySsl; - public DefaultRestClientFactory(@Nullable String pathPrefix) { + public DefaultRestClientFactory(@Nullable String pathPrefix, Boolean skipVerifySsl) { this.pathPrefix = pathPrefix; + this.skipVerifySsl = skipVerifySsl; } @Override @@ -208,6 +218,20 @@ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { if (pathPrefix != null) { restClientBuilder.setPathPrefix(pathPrefix); } + if (skipVerifySsl) { + restClientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> { + try { + return httpClientBuilder.setSSLContext(SSLContexts + .custom() + .loadTrustMaterial(null, TrustAllStrategy.INSTANCE) + .build()); + } catch (NoSuchAlgorithmException | KeyManagementException | + KeyStoreException e) { + throw new RuntimeException(e); + } + }); + } } @Override @@ -236,12 +260,17 @@ static class AuthRestClientFactory implements RestClientFactory { private final String username; private final String password; private transient CredentialsProvider credentialsProvider; + private final Boolean skipVerifySsl; public AuthRestClientFactory( - @Nullable String pathPrefix, String username, String password) { + @Nullable String pathPrefix, + String username, + String password, + Boolean skipVerifySsl) { this.pathPrefix = pathPrefix; this.password = password; this.username = username; + this.skipVerifySsl = skipVerifySsl; } @Override @@ -254,10 +283,28 @@ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials(username, password)); } - restClientBuilder.setHttpClientConfigCallback( - httpAsyncClientBuilder -> - httpAsyncClientBuilder.setDefaultCredentialsProvider( - credentialsProvider)); + if (skipVerifySsl) { + restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { + try { + return httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .setSSLContext(SSLContexts + .custom() + .loadTrustMaterial(null, TrustAllStrategy.INSTANCE) + .build()); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } catch (KeyManagementException e) { + throw new RuntimeException(e); + } catch (KeyStoreException e) { + throw new RuntimeException(e); + } + }); + } else { + restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } } @Override diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java index a538122b..30caee31 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java @@ -76,11 +76,11 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { new Elasticsearch7DynamicSink.AuthRestClientFactory( config.getPathPrefix().orElse(null), config.getUsername().get(), - config.getPassword().get()); + config.getPassword().get(), config.getSkipVerifySsl().get()); } else { restClientFactory = new Elasticsearch7DynamicSink.DefaultRestClientFactory( - config.getPathPrefix().orElse(null)); + config.getPathPrefix().orElse(null), config.getSkipVerifySsl().get()); } Elasticsearch7ApiCallBridge elasticsearch7ApiCallBridge = diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index b516777d..4cacffbf 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -60,6 +60,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECT_SKIP_VERIFY_SSL; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION; @@ -96,6 +97,7 @@ public class Elasticsearch7DynamicTableFactory BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, BULK_FLUSH_BACKOFF_DELAY_OPTION, CONNECTION_PATH_PREFIX, + CONNECT_SKIP_VERIFY_SSL, FORMAT_OPTION, PASSWORD_OPTION, USERNAME_OPTION, diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java index 8bd39217..c1a36c71 100644 --- a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java @@ -92,7 +92,7 @@ public void testBuilder() { verify(provider.builderSpy).setBulkFlushMaxSizeMb(1); verify(provider.builderSpy) .setRestClientFactory( - new Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp")); + new Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp", true)); verify(provider.sinkSpy).disableFlushOnCheckpoint(); } @@ -124,7 +124,8 @@ public void testDefaultConfig() { verify(provider.builderSpy).setBulkFlushMaxActions(1000); verify(provider.builderSpy).setBulkFlushMaxSizeMb(2); verify(provider.builderSpy) - .setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory(null)); + .setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory( + null, true)); verify(provider.sinkSpy, never()).disableFlushOnCheckpoint(); } @@ -160,7 +161,7 @@ public void testAuthConfig() { verify(provider.builderSpy) .setRestClientFactory( new Elasticsearch7DynamicSink.AuthRestClientFactory( - null, USERNAME, PASSWORD)); + null, USERNAME, PASSWORD, true)); verify(provider.sinkSpy, never()).disableFlushOnCheckpoint(); }