diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index e1ba2d4..e3a7f8b 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -25,4 +25,4 @@ jobs: compile_and_test: uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: - flink_version: 1.16.0 + flink_version: 1.16.1 diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java index 7f17c68..accf2a5 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java @@ -29,20 +29,12 @@ import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.conn.ssl.TrustAllStrategy; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.ssl.SSLContexts; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +42,6 @@ import java.io.IOException; import java.net.ConnectException; import java.net.NoRouteToHostException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -135,12 +124,7 @@ class OpensearchAsyncWriter extends AsyncSinkWriter { - if (networkClientConfig.getPassword() != null - && networkClientConfig.getUsername() != null) { - final CredentialsProvider credentialsProvider = - new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials( - networkClientConfig.getUsername(), - networkClientConfig.getPassword())); - - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } - - if (networkClientConfig.isAllowInsecure().orElse(false)) { - try { - httpClientBuilder.setSSLContext( - SSLContexts.custom() - .loadTrustMaterial(new TrustAllStrategy()) - .build()); - } catch (final NoSuchAlgorithmException - | KeyStoreException - | KeyManagementException ex) { - throw new IllegalStateException( - "Unable to create custom SSL context", ex); - } - } - - return httpClientBuilder; - }); - if (networkClientConfig.getConnectionRequestTimeout() != null - || networkClientConfig.getConnectionTimeout() != null - || networkClientConfig.getSocketTimeout() != null) { - builder.setRequestConfigCallback( - requestConfigBuilder -> { - if (networkClientConfig.getConnectionRequestTimeout() != null) { - requestConfigBuilder.setConnectionRequestTimeout( - networkClientConfig.getConnectionRequestTimeout()); - } - if (networkClientConfig.getConnectionTimeout() != null) { - requestConfigBuilder.setConnectTimeout( - networkClientConfig.getConnectionTimeout()); - } - if (networkClientConfig.getSocketTimeout() != null) { - requestConfigBuilder.setSocketTimeout( - networkClientConfig.getSocketTimeout()); - } - return requestConfigBuilder; - }); - } - return builder; - } } diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java new file mode 100644 index 0000000..22784db --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchRestClientCreator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContexts; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; + +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.List; + +/** The utility class to encapsulate {@link RestHighLevelClient} creation. */ +class OpensearchRestClientCreator { + /** Utility class. */ + private OpensearchRestClientCreator() {} + + /** + * Creates new instance of {@link RestHighLevelClient}. + * + * @param hosts list of hosts to connect + * @param networkClientConfig client network configuration + * @return new instance of {@link RestHighLevelClient} + */ + static RestHighLevelClient create( + final List hosts, final NetworkClientConfig networkClientConfig) { + return new RestHighLevelClient( + configureRestClientBuilder( + RestClient.builder(hosts.toArray(new HttpHost[0])), networkClientConfig)); + } + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, NetworkClientConfig networkClientConfig) { + if (networkClientConfig.getConnectionPathPrefix() != null) { + builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); + } + + builder.setHttpClientConfigCallback( + httpClientBuilder -> { + if (networkClientConfig.getPassword() != null + && networkClientConfig.getUsername() != null) { + final CredentialsProvider credentialsProvider = + new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials( + networkClientConfig.getUsername(), + networkClientConfig.getPassword())); + + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + if (networkClientConfig.isAllowInsecure().orElse(false)) { + try { + httpClientBuilder.setSSLContext( + SSLContexts.custom() + .loadTrustMaterial(new TrustAllStrategy()) + .build()); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException( + "Unable to create custom SSL context", ex); + } + } + + return httpClientBuilder; + }); + if (networkClientConfig.getConnectionRequestTimeout() != null + || networkClientConfig.getConnectionTimeout() != null + || networkClientConfig.getSocketTimeout() != null) { + builder.setRequestConfigCallback( + requestConfigBuilder -> { + if (networkClientConfig.getConnectionRequestTimeout() != null) { + requestConfigBuilder.setConnectionRequestTimeout( + networkClientConfig.getConnectionRequestTimeout()); + } + if (networkClientConfig.getConnectionTimeout() != null) { + requestConfigBuilder.setConnectTimeout( + networkClientConfig.getConnectionTimeout()); + } + if (networkClientConfig.getSocketTimeout() != null) { + requestConfigBuilder.setSocketTimeout( + networkClientConfig.getSocketTimeout()); + } + return requestConfigBuilder; + }); + } + return builder; + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java index 3231b28..1b0d216 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java @@ -27,12 +27,6 @@ import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.conn.ssl.TrustAllStrategy; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.ssl.SSLContexts; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BackoffPolicy; @@ -44,8 +38,6 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; @@ -55,9 +47,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.List; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -107,11 +96,7 @@ class OpensearchWriter implements SinkWriter { this.emitter = checkNotNull(emitter); this.flushOnCheckpoint = flushOnCheckpoint; this.mailboxExecutor = checkNotNull(mailboxExecutor); - this.client = - new RestHighLevelClient( - configureRestClientBuilder( - RestClient.builder(hosts.toArray(new HttpHost[0])), - networkClientConfig)); + this.client = OpensearchRestClientCreator.create(hosts, networkClientConfig); this.bulkProcessor = createBulkProcessor(bulkProcessorConfig); this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter()); checkNotNull(metricGroup); @@ -161,66 +146,6 @@ public void close() throws Exception { client.close(); } - private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, NetworkClientConfig networkClientConfig) { - if (networkClientConfig.getConnectionPathPrefix() != null) { - builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); - } - - builder.setHttpClientConfigCallback( - httpClientBuilder -> { - if (networkClientConfig.getPassword() != null - && networkClientConfig.getUsername() != null) { - final CredentialsProvider credentialsProvider = - new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials( - networkClientConfig.getUsername(), - networkClientConfig.getPassword())); - - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } - - if (networkClientConfig.isAllowInsecure().orElse(false)) { - try { - httpClientBuilder.setSSLContext( - SSLContexts.custom() - .loadTrustMaterial(new TrustAllStrategy()) - .build()); - } catch (final NoSuchAlgorithmException - | KeyStoreException - | KeyManagementException ex) { - throw new IllegalStateException( - "Unable to create custom SSL context", ex); - } - } - - return httpClientBuilder; - }); - if (networkClientConfig.getConnectionRequestTimeout() != null - || networkClientConfig.getConnectionTimeout() != null - || networkClientConfig.getSocketTimeout() != null) { - builder.setRequestConfigCallback( - requestConfigBuilder -> { - if (networkClientConfig.getConnectionRequestTimeout() != null) { - requestConfigBuilder.setConnectionRequestTimeout( - networkClientConfig.getConnectionRequestTimeout()); - } - if (networkClientConfig.getConnectionTimeout() != null) { - requestConfigBuilder.setConnectTimeout( - networkClientConfig.getConnectionTimeout()); - } - if (networkClientConfig.getSocketTimeout() != null) { - requestConfigBuilder.setSocketTimeout( - networkClientConfig.getSocketTimeout()); - } - return requestConfigBuilder; - }); - } - return builder; - } - private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) { final BulkProcessor.Builder builder = diff --git a/pom.xml b/pom.xml index 079703b..015f2d7 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ under the License. - 1.16.0 + 1.16.1 15.0 2.13.4.20221013