diff --git a/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java b/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java new file mode 100644 index 0000000..065c3a8 --- /dev/null +++ b/flink-connector-opensearch-e2e-tests/src/main/java/org/apache/flink/streaming/tests/OpensearchAsyncSinkExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSink; +import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkBuilder; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +import org.apache.http.HttpHost; +import org.opensearch.action.index.IndexRequest; + +import java.util.ArrayList; +import java.util.List; + +/** End to end test for OpensearchAsyncSink. */ +public class OpensearchAsyncSinkExample { + + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 2) { + System.out.println( + "Missing parameters!\n" + "Usage: --numRecords --index "); + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(5000); + + DataStream> source = + env.fromSequence(0, parameterTool.getInt("numRecords") - 1) + .flatMap( + new FlatMapFunction>() { + @Override + public void flatMap( + Long value, Collector> out) { + final String key = String.valueOf(value); + final String message = "message #" + value; + out.collect(Tuple2.of(key, message + "update #1")); + out.collect(Tuple2.of(key, message + "update #2")); + } + }); + + List httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); + + OpensearchAsyncSinkBuilder> osSinkBuilder = + OpensearchAsyncSink.>builder() + .setHosts(new HttpHost("localhost:9200")) + .setElementConverter( + (element, context) -> + new IndexRequest("my-index") + .id(element.f0.toString()) + .source(element.f1)); + + source.sinkTo(osSinkBuilder.build()); + + env.execute("Opensearch end to end async sink test example"); + } +} diff --git a/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 b/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 index 02ea2fd..1ef9c43 100644 --- a/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 +++ b/flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484 @@ -1,3 +1,15 @@ +org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule +org.apache.flink.connector.opensearch.sink.OpensearchAsyncWriterITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy: only one of the following predicates match:\ * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ diff --git a/flink-connector-opensearch/pom.xml b/flink-connector-opensearch/pom.xml index fdc52b2..0cb47be 100644 --- a/flink-connector-opensearch/pom.xml +++ b/flink-connector-opensearch/pom.xml @@ -118,6 +118,14 @@ under the License. test-jar + + org.apache.flink + flink-connector-base + ${flink.version} + test + test-jar + + org.apache.flink diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java new file mode 100644 index 0000000..2edd97f --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable}, + * required by AsyncSink scaffolding. + * + * @param type of the write request + */ +@PublicEvolving +public class DocSerdeRequest implements Serializable { + private static final long serialVersionUID = 1L; + private final DocWriteRequest request; + + private DocSerdeRequest(DocWriteRequest request) { + this.request = request; + } + + public DocWriteRequest getRequest() { + return request; + } + + static DocSerdeRequest from(DocWriteRequest request) { + return new DocSerdeRequest<>(request); + } + + static DocSerdeRequest readFrom(long requestSize, DataInputStream in) throws IOException { + try (final StreamInput stream = new InputStreamStreamInput(in, requestSize)) { + return new DocSerdeRequest<>(readDocumentRequest(stream)); + } + } + + void writeTo(DataOutputStream out) throws IOException { + try (BytesStreamOutput stream = new BytesStreamOutput()) { + writeDocumentRequest(stream, request); + out.write(BytesReference.toBytes(stream.bytes())); + } + } + + /** Read a document write (index/delete/update) request. */ + private static DocWriteRequest readDocumentRequest(StreamInput in) throws IOException { + byte type = in.readByte(); + DocWriteRequest docWriteRequest; + if (type == 0) { + docWriteRequest = new IndexRequest(in); + } else if (type == 1) { + docWriteRequest = new DeleteRequest(in); + } else if (type == 2) { + docWriteRequest = new UpdateRequest(in); + } else { + throw new IllegalStateException("Invalid request type [" + type + " ]"); + } + return docWriteRequest; + } + + /** Write a document write (index/delete/update) request. */ + private static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) + throws IOException { + if (request instanceof IndexRequest) { + out.writeByte((byte) 0); + ((IndexRequest) request).writeTo(out); + } else if (request instanceof DeleteRequest) { + out.writeByte((byte) 1); + ((DeleteRequest) request).writeTo(out); + } else if (request instanceof UpdateRequest) { + out.writeByte((byte) 2); + ((UpdateRequest) request).writeTo(out); + } else { + throw new IllegalStateException( + "Invalid request [" + request.getClass().getSimpleName() + " ]"); + } + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java new file mode 100644 index 0000000..f62e55b --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.http.HttpHost; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Flink's Async Sink to insert or update data in an Opensearch index (see please {@link + * OpensearchAsyncWriter}). + * + * @param type of the records converted to Opensearch actions (instances of {@link + * DocSerdeRequest}) + * @see OpensearchAsyncSinkBuilder on how to construct a OpensearchAsyncSink + */ +@PublicEvolving +public class OpensearchAsyncSink extends AsyncSinkBase> { + private static final long serialVersionUID = 1L; + + private final List hosts; + private final NetworkClientConfig networkClientConfig; + + /** + * Constructor creating an Opensearch async sink. + * + * @param maxBatchSize the maximum size of a batch of entries that may be sent + * @param maxInFlightRequests he maximum number of in flight requests that may exist, if any + * more in flight requests need to be initiated once the maximum has been reached, then it + * will be blocked until some have completed + * @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add + * elements will be blocked while the number of elements in the buffer is at the maximum + * @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS + * measured in bytes + * @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the + * buffer, if any element reaches this age, the entire buffer will be flushed immediately + * @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the + * buffer, a record of size larger than this will be rejected when passed to the sink + * @param elementConverter converting incoming records to Opensearch write document requests + * @param hosts the reachable Opensearch cluster nodes + * @param networkClientConfig describing properties of the network connection used to connect to + * the Opensearch cluster + */ + OpensearchAsyncSink( + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + ElementConverter> elementConverter, + List hosts, + NetworkClientConfig networkClientConfig) { + super( + elementConverter, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes); + this.hosts = checkNotNull(hosts); + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + this.networkClientConfig = checkNotNull(networkClientConfig); + } + + /** + * Create a {@link OpensearchAsyncSinkBuilder} to construct a new {@link OpensearchAsyncSink}. + * + * @param type of incoming records + * @return {@link OpensearchAsyncSinkBuilder} + */ + public static OpensearchAsyncSinkBuilder builder() { + return new OpensearchAsyncSinkBuilder<>(); + } + + @Internal + @Override + public StatefulSinkWriter>> createWriter( + InitContext context) throws IOException { + return new OpensearchAsyncWriter<>( + context, + getElementConverter(), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + hosts, + networkClientConfig, + Collections.emptyList()); + } + + @Internal + @Override + public StatefulSinkWriter>> restoreWriter( + InitContext context, + Collection>> recoveredState) + throws IOException { + return new OpensearchAsyncWriter<>( + context, + getElementConverter(), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + hosts, + networkClientConfig, + recoveredState); + } + + @Internal + @Override + public SimpleVersionedSerializer>> + getWriterStateSerializer() { + return new OpensearchWriterStateSerializer(); + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java new file mode 100644 index 0000000..e1bdd65 --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.apache.http.HttpHost; +import org.opensearch.action.DocWriteRequest; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder to construct an Opensearch compatible {@link OpensearchAsyncSink}. + * + *

The following example shows the minimal setup to create a OpensearchAsyncSink that submits + * actions with the default number of actions to buffer (1000). + * + *

{@code
+ * OpensearchAsyncSink> sink = OpensearchAsyncSink
+ *     .>builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setElementConverter((element, context) ->
+ *         new IndexRequest("my-index").id(element.f0.toString()).source(element.f1));
+ *     .build();
+ * }
+ * + * @param type of the records converted to Opensearch actions + */ +@PublicEvolving +public class OpensearchAsyncSinkBuilder + extends AsyncSinkBaseBuilder< + InputT, DocSerdeRequest, OpensearchAsyncSinkBuilder> { + private List hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer connectionRequestTimeout; + private Integer socketTimeout; + private Boolean allowInsecure; + private ElementConverter> elementConverter; + + /** + * Sets the element converter. + * + * @param elementConverter element converter + */ + public OpensearchAsyncSinkBuilder setElementConverter( + ElementConverter> elementConverter) { + this.elementConverter = + (element, context) -> + DocSerdeRequest.from(elementConverter.apply(element, context)); + return this; + } + + /** + * Sets the hosts where the Opensearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public OpensearchAsyncSinkBuilder setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the username used to authenticate the connection with the Opensearch cluster. + * + * @param username of the Opensearch cluster user + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionUsername(String username) { + checkNotNull(username); + this.username = username; + return this; + } + + /** + * Sets the password used to authenticate the conection with the Opensearch cluster. + * + * @param password of the Opensearch cluster user + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionPassword(String password) { + checkNotNull(password); + this.password = password; + return this; + } + + /** + * Sets a prefix which used for every REST communication to the Opensearch cluster. + * + * @param prefix for the communication + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionPathPrefix(String prefix) { + checkNotNull(prefix); + this.connectionPathPrefix = prefix; + return this; + } + + /** + * Sets the timeout for requesting the connection of the Opensearch cluster from the connection + * manager. + * + * @param timeout for the connection request + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionRequestTimeout(int timeout) { + checkState(timeout >= 0, "Connection request timeout must be larger than or equal to 0."); + this.connectionRequestTimeout = timeout; + return this; + } + + /** + * Sets the timeout for establishing a connection of the Opensearch cluster. + * + * @param timeout for the connection + * @return this builder + */ + public OpensearchAsyncSinkBuilder setConnectionTimeout(int timeout) { + checkState(timeout >= 0, "Connection timeout must be larger than or equal to 0."); + this.connectionTimeout = timeout; + return this; + } + + /** + * Sets the timeout for waiting for data or, put differently, a maximum period inactivity + * between two consecutive data packets. + * + * @param timeout for the socket + * @return this builder + */ + public OpensearchAsyncSinkBuilder setSocketTimeout(int timeout) { + checkState(timeout >= 0, "Socket timeout must be larger than or equal to 0."); + this.socketTimeout = timeout; + return this; + } + + /** + * Allows to bypass the certificates chain validation and connect to insecure network endpoints + * (for example, servers which use self-signed certificates). + * + * @param allowInsecure allow or not to insecure network endpoints + * @return this builder + */ + public OpensearchAsyncSinkBuilder setAllowInsecure(boolean allowInsecure) { + this.allowInsecure = allowInsecure; + return this; + } + + @Override + public OpensearchAsyncSink build() { + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + + return new OpensearchAsyncSink( + nonNullOrDefault( + getMaxBatchSize(), + 1000), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION */ + nonNullOrDefault( + getMaxInFlightRequests(), 1), /* BulkProcessor::concurrentRequests */ + nonNullOrDefault(getMaxBufferedRequests(), 10000), + nonNullOrDefault( + getMaxBatchSizeInBytes(), + 2 * 1024 + * 1024), /* OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION */ + nonNullOrDefault( + getMaxTimeInBufferMS(), + 1000), /* OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION */ + nonNullOrDefault(getMaxRecordSizeInBytes(), 1 * 1024 * 1024), /* 1Mb */ + elementConverter, + hosts, + buildNetworkClientConfig()); + } + + private static int nonNullOrDefault(Integer value, int defaultValue) { + return (value != null) ? value : defaultValue; + } + + private static long nonNullOrDefault(Long value, long defaultValue) { + return (value != null) ? value : defaultValue; + } + + private NetworkClientConfig buildNetworkClientConfig() { + return new NetworkClientConfig( + username, + password, + connectionPathPrefix, + connectionRequestTimeout, + connectionTimeout, + socketTimeout, + allowInsecure); + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java new file mode 100644 index 0000000..d5dcbaa --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContexts; +import org.opensearch.action.ActionListener; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Flink's Async Sink Writer to insert or update data in an Opensearch index (see please + * {@link OpensearchAsyncSink}). + * + * @param type of the records converted to Opensearch actions (instances of {@link + * DocSerdeRequest}) + */ +@Internal +class OpensearchAsyncWriter extends AsyncSinkWriter> { + private static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncWriter.class); + + private final RestHighLevelClient client; + private final Counter numRecordsOutErrorsCounter; + private volatile boolean closed = false; + + /** + * Constructor creating an Opensearch async writer. + * + * @param context the initialization context + * @param elementConverter converting incoming records to Opensearch write document requests + * @param maxBatchSize the maximum size of a batch of entries that may be sent + * @param maxInFlightRequests he maximum number of in flight requests that may exist, if any + * more in flight requests need to be initiated once the maximum has been reached, then it + * will be blocked until some have completed + * @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add + * elements will be blocked while the number of elements in the buffer is at the maximum + * @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS + * measured in bytes + * @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the + * buffer, if any element reaches this age, the entire buffer will be flushed immediately + * @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the + * buffer, a record of size larger than this will be rejected when passed to the sink + * @param hosts the reachable Opensearch cluster nodes + * @param networkClientConfig describing properties of the network connection used to connect to + * the Opensearch cluster + * @param initialStates the initial state of the sink + */ + OpensearchAsyncWriter( + Sink.InitContext context, + ElementConverter> elementConverter, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + List hosts, + NetworkClientConfig networkClientConfig, + Collection>> initialStates) { + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .build(), + initialStates); + + this.client = + new RestHighLevelClient( + configureRestClientBuilder( + RestClient.builder(hosts.toArray(new HttpHost[0])), + networkClientConfig)); + + final SinkWriterMetricGroup metricGroup = context.metricGroup(); + checkNotNull(metricGroup); + + this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter(); + } + + @Override + protected void submitRequestEntries( + List> requestEntries, + Consumer>> requestResult) { + + BulkRequest bulkRequest = new BulkRequest(); + requestEntries.forEach(r -> bulkRequest.add(r.getRequest())); + + final CompletableFuture future = new CompletableFuture<>(); + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + future.complete(response); + } + + @Override + public void onFailure(Exception e) { + future.completeExceptionally(e); + } + }); + + future.whenComplete( + (response, err) -> { + if (err != null) { + handleFullyFailedBulkRequest(err, requestEntries, requestResult); + } else if (response.hasFailures()) { + handlePartiallyFailedBulkRequests(response, requestEntries, requestResult); + } else { + requestResult.accept(Collections.emptyList()); + } + }); + } + + @Override + protected long getSizeInBytes(DocSerdeRequest requestEntry) { + return requestEntry.getRequest().ramBytesUsed(); + } + + @Override + public void close() { + if (!closed) { + closed = true; + + try { + client.close(); + } catch (final IOException ex) { + LOG.warn("Error while closing RestHighLevelClient instance", ex); + } + } + } + + private void handleFullyFailedBulkRequest( + Throwable err, + List> requestEntries, + Consumer>> requestResult) { + numRecordsOutErrorsCounter.inc(requestEntries.size()); + requestResult.accept(requestEntries); + } + + private void handlePartiallyFailedBulkRequests( + BulkResponse response, + List> requestEntries, + Consumer>> requestResult) { + + final List> failedRequestEntries = new ArrayList<>(); + final BulkItemResponse[] items = response.getItems(); + + for (int i = 0; i < items.length; i++) { + if (items[i].getFailure() != null) { + failedRequestEntries.add(DocSerdeRequest.from(requestEntries.get(i).getRequest())); + } + } + + numRecordsOutErrorsCounter.inc(failedRequestEntries.size()); + requestResult.accept(failedRequestEntries); + } + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, NetworkClientConfig networkClientConfig) { + if (networkClientConfig.getConnectionPathPrefix() != null) { + builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); + } + + builder.setHttpClientConfigCallback( + httpClientBuilder -> { + if (networkClientConfig.getPassword() != null + && networkClientConfig.getUsername() != null) { + final CredentialsProvider credentialsProvider = + new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials( + networkClientConfig.getUsername(), + networkClientConfig.getPassword())); + + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + if (networkClientConfig.isAllowInsecure().orElse(false)) { + try { + httpClientBuilder.setSSLContext( + SSLContexts.custom() + .loadTrustMaterial(new TrustAllStrategy()) + .build()); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException( + "Unable to create custom SSL context", ex); + } + } + + return httpClientBuilder; + }); + if (networkClientConfig.getConnectionRequestTimeout() != null + || networkClientConfig.getConnectionTimeout() != null + || networkClientConfig.getSocketTimeout() != null) { + builder.setRequestConfigCallback( + requestConfigBuilder -> { + if (networkClientConfig.getConnectionRequestTimeout() != null) { + requestConfigBuilder.setConnectionRequestTimeout( + networkClientConfig.getConnectionRequestTimeout()); + } + if (networkClientConfig.getConnectionTimeout() != null) { + requestConfigBuilder.setConnectTimeout( + networkClientConfig.getConnectionTimeout()); + } + if (networkClientConfig.getSocketTimeout() != null) { + requestConfigBuilder.setSocketTimeout( + networkClientConfig.getSocketTimeout()); + } + return requestConfigBuilder; + }); + } + return builder; + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java index 895ca03..208ef37 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java @@ -38,7 +38,7 @@ * on checkpoint or the default number of actions was buffered (1000). * *
{@code
- * OpensearchSink sink = new OpensearchSinkBuilder()
+ * OpensearchSink> sink = new OpensearchSinkBuilder>()
  *     .setHosts(new HttpHost("localhost:9200")
  *     .setEmitter((element, context, indexer) -> {
  *          indexer.add(
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
new file mode 100644
index 0000000..e41dcaa
--- /dev/null
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterStateSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Apache Flink's Async Sink write state for Opensearch document write requests (see please {@link
+ * DocWriteRequest}), wrapped into {@link DocSerdeRequest}.
+ */
+@Internal
+class OpensearchWriterStateSerializer extends AsyncSinkWriterStateSerializer> {
+    @Override
+    protected void serializeRequestToStream(DocSerdeRequest request, DataOutputStream out)
+            throws IOException {
+        request.writeTo(out);
+    }
+
+    @Override
+    protected DocSerdeRequest deserializeRequestFromStream(long requestSize, DataInputStream in)
+            throws IOException {
+        return DocSerdeRequest.readFrom(requestSize, in);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
new file mode 100644
index 0000000..b36c077
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestFactory;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link OpensearchAsyncSinkBuilder}. */
+@ExtendWith(TestLoggerExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class OpensearchAsyncSinkBuilderTest {
+
+    @TestFactory
+    Stream testValidBuilders() {
+        Stream> validBuilders =
+                Stream.of(
+                        createMinimalBuilder(),
+                        createMinimalBuilder()
+                                .setConnectionUsername("username")
+                                .setConnectionPassword("password"));
+
+        return DynamicTest.stream(
+                validBuilders,
+                OpensearchAsyncSinkBuilder::toString,
+                builder -> assertThatNoException().isThrownBy(builder::build));
+    }
+
+    @Test
+    void testThrowIfHostsNotSet() {
+        assertThatThrownBy(
+                        () ->
+                                createEmptyBuilder()
+                                        .setElementConverter((element, context) -> null)
+                                        .build())
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    void testThrowIfElementConverterNotSet() {
+        assertThatThrownBy(
+                        () -> createEmptyBuilder().setHosts(new HttpHost("localhost:3000")).build())
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    void testThrowIfSetInvalidTimeouts() {
+        assertThatThrownBy(() -> createEmptyBuilder().setConnectionRequestTimeout(-1).build())
+                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(() -> createEmptyBuilder().setConnectionTimeout(-1).build())
+                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(() -> createEmptyBuilder().setSocketTimeout(-1).build())
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    private OpensearchAsyncSinkBuilder createEmptyBuilder() {
+        return new OpensearchAsyncSinkBuilder<>();
+    }
+
+    private OpensearchAsyncSinkBuilder createMinimalBuilder() {
+        return new OpensearchAsyncSinkBuilder<>()
+                .setElementConverter((element, context) -> null)
+                .setHosts(new HttpHost("localhost:3000"));
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java
new file mode 100644
index 0000000..1930553
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkITCase.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.opensearch.OpensearchUtil;
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OpensearchAsyncSink}. */
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class OpensearchAsyncSinkITCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncSinkITCase.class);
+    private static boolean failed;
+
+    private RestHighLevelClient client;
+    private OpensearchTestClient context;
+
+    @Container
+    private static final OpensearchContainer OS_CONTAINER =
+            OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG);
+
+    @BeforeEach
+    void setUp() {
+        failed = false;
+        client = OpensearchUtil.createClient(OS_CONTAINER);
+        context = new OpensearchTestClient(client);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("opensearchConverters")
+    void testWriteJsonToOpensearch(
+            BiFunction<
+                            String,
+                            String,
+                            ElementConverter, DocWriteRequest>>
+                    converterProvider)
+            throws Exception {
+        final String index = "test-opensearch-async-sink-" + UUID.randomUUID();
+        runTest(index, false, converterProvider, null);
+    }
+
+    @Test
+    void testRecovery() throws Exception {
+        final String index = "test-recovery-opensearch-async-sink";
+        runTest(index, true, TestConverter::jsonConverter, new FailingMapper());
+        assertThat(failed).isTrue();
+    }
+
+    private void runTest(
+            String index,
+            boolean allowRestarts,
+            BiFunction<
+                            String,
+                            String,
+                            ElementConverter, DocWriteRequest>>
+                    converterProvider,
+            @Nullable MapFunction additionalMapper)
+            throws Exception {
+        final OpensearchAsyncSinkBuilder> builder =
+                OpensearchAsyncSink.>builder()
+                        .setHosts(HttpHost.create(OS_CONTAINER.getHttpHostAddress()))
+                        .setElementConverter(
+                                converterProvider.apply(index, context.getDataFieldName()))
+                        .setMaxBatchSize(5)
+                        .setConnectionUsername(OS_CONTAINER.getUsername())
+                        .setConnectionPassword(OS_CONTAINER.getPassword())
+                        .setAllowInsecure(true);
+
+        try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
+            env.enableCheckpointing(100L);
+            if (!allowRestarts) {
+                env.setRestartStrategy(RestartStrategies.noRestart());
+            }
+            DataStream stream = env.fromSequence(1, 5);
+
+            if (additionalMapper != null) {
+                stream = stream.map(additionalMapper);
+            }
+
+            stream.map(
+                            new MapFunction>() {
+                                @Override
+                                public Tuple2 map(Long value) throws Exception {
+                                    return Tuple2.of(
+                                            value.intValue(),
+                                            OpensearchTestClient.buildMessage(value.intValue()));
+                                }
+                            })
+                    .sinkTo(builder.build());
+            env.execute();
+            context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+        }
+    }
+
+    private static List<
+                    BiFunction<
+                            String,
+                            String,
+                            ElementConverter, DocWriteRequest>>>
+            opensearchConverters() {
+        return Arrays.asList(TestConverter::jsonConverter, TestConverter::smileConverter);
+    }
+
+    private static class FailingMapper implements MapFunction, CheckpointListener {
+        private static final long serialVersionUID = 1L;
+        private int emittedRecords = 0;
+
+        @Override
+        public Long map(Long value) throws Exception {
+            Thread.sleep(50);
+            emittedRecords++;
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            if (failed || emittedRecords == 0) {
+                return;
+            }
+            failed = true;
+            throw new Exception("Expected failure");
+        }
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
new file mode 100644
index 0000000..313f85e
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriterITCase.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.opensearch.OpensearchUtil;
+import org.apache.flink.connector.opensearch.test.DockerImageVersions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import static org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OpensearchAsyncWriter}. */
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class OpensearchAsyncWriterITCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpensearchAsyncWriterITCase.class);
+
+    @Container
+    private static final OpensearchContainer OS_CONTAINER =
+            OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG);
+
+    private RestHighLevelClient client;
+    private OpensearchTestClient clientContext;
+    private TestSinkInitContext context;
+
+    private final Lock lock = new ReentrantLock();
+    private final Condition completed = lock.newCondition();
+    private final List> requests = new ArrayList<>();
+
+    @BeforeEach
+    void setUp() {
+        client = OpensearchUtil.createClient(OS_CONTAINER);
+        clientContext = new OpensearchTestClient(client);
+        context = new TestSinkInitContext();
+        requests.clear();
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Test
+    @Timeout(5)
+    void testWriteOnBulkFlush() throws Exception {
+        final String index = "test-bulk-flush-without-checkpoint-async";
+        final int maxBatchSize = 5;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, maxBatchSize, Long.MAX_VALUE)) {
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+
+            // Ignore flush on checkpoint
+            writer.flush(false);
+            clientContext.assertThatIdsAreNotWritten(index, 1, 2, 3, 4);
+
+            // Trigger flush
+            writer.write(Tuple2.of(5, "test-5"), null);
+
+            /* await for async bulk request to complete */
+            awaitForCompletion();
+
+            clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+
+            writer.write(Tuple2.of(6, "test-6"), null);
+            clientContext.assertThatIdsAreNotWritten(index, 6);
+
+            // Force flush
+            writer.flush(true);
+            clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5, 6);
+        }
+    }
+
+    @Test
+    @Timeout(5)
+    void testWriteOnBulkIntervalFlush() throws Exception {
+        final String index = "test-bulk-flush-with-interval-async";
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, 10, 1000 /* 1s */)) {
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+
+            /* advance timer */
+            context.getTestProcessingTimeService().advance(1200);
+
+            /* await for async bulk request to complete */
+            awaitForCompletion();
+        }
+
+        clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4);
+    }
+
+    @Test
+    void testIncrementByteOutMetric() throws Exception {
+        final String index = "test-inc-byte-out-async";
+        final int flushAfterNActions = 2;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, flushAfterNActions, Long.MAX_VALUE)) {
+            final Counter numBytesOut = context.getNumBytesOutCounter();
+            assertThat(numBytesOut.getCount()).isEqualTo(0);
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+
+            writer.flush(true);
+            long first = numBytesOut.getCount();
+
+            assertThat(first).isGreaterThan(0);
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+
+            writer.flush(true);
+            assertThat(numBytesOut.getCount()).isGreaterThan(first);
+        }
+    }
+
+    @Test
+    void testIncrementRecordsSendMetric() throws Exception {
+        final String index = "test-inc-records-send-async";
+        final int flushAfterNActions = 2;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, flushAfterNActions, Long.MAX_VALUE)) {
+            final Counter recordsSend = context.getNumRecordsOutCounter();
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            // Update existing index
+            writer.write(Tuple2.of(1, "u" + buildMessage(2)), null);
+            // Delete index
+            writer.write(Tuple2.of(1, "d" + buildMessage(3)), null);
+
+            writer.flush(true);
+
+            assertThat(recordsSend.getCount()).isEqualTo(3L);
+        }
+    }
+
+    @Test
+    void testCurrentSendTime() throws Exception {
+        final String index = "test-current-send-time-async";
+        final int flushAfterNActions = 2;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, flushAfterNActions, Long.MAX_VALUE)) {
+            final Optional> currentSendTime = context.getCurrentSendTimeGauge();
+
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+
+            writer.flush(true);
+
+            assertThat(currentSendTime).isPresent();
+            assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
+        }
+    }
+
+    @Test
+    @Timeout(5)
+    void testWriteError() throws Exception {
+        final String index = "test-bulk-flush-error-async";
+        final int maxBatchSize = 5;
+
+        try (final OpensearchAsyncWriter> writer =
+                createWriter(context, index, maxBatchSize, Long.MAX_VALUE)) {
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+
+            // Force flush
+            writer.flush(true);
+            clientContext.assertThatIdsAreWritten(index, 1);
+            assertThat(requests).hasSize(0);
+
+            // The "c" prefix should force the create mode and fail the bulk item request
+            // (duplicate)
+            writer.write(Tuple2.of(1, buildMessage(1, "c")), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+            writer.write(Tuple2.of(5, buildMessage(5)), null);
+
+            /* await for async bulk request to complete */
+            awaitForCompletion();
+
+            // Force flush
+            clientContext.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+            assertThat(requests).hasSize(1);
+        }
+    }
+
+    private OpensearchAsyncWriter> createWriter(
+            Sink.InitContext context, String index, int maxBatchSize, long maxTimeInBufferMS) {
+        return new OpensearchAsyncWriter>(
+                context,
+                new UpdatingElementConverter(index, clientContext.getDataFieldName()),
+                maxBatchSize,
+                1,
+                100,
+                Long.MAX_VALUE,
+                maxTimeInBufferMS,
+                Long.MAX_VALUE,
+                Collections.singletonList(HttpHost.create(OS_CONTAINER.getHttpHostAddress())),
+                new NetworkClientConfig(
+                        OS_CONTAINER.getUsername(),
+                        OS_CONTAINER.getPassword(),
+                        null,
+                        null,
+                        null,
+                        null,
+                        true),
+                Collections.emptyList()) {
+            @Override
+            protected void submitRequestEntries(
+                    List> requestEntries,
+                    Consumer>> requestResult) {
+                super.submitRequestEntries(
+                        requestEntries,
+                        (entries) -> {
+                            requestResult.accept(entries);
+
+                            lock.lock();
+                            try {
+                                requests.addAll(entries);
+                                completed.signal();
+                            } finally {
+                                lock.unlock();
+                            }
+                        });
+            }
+        };
+    }
+
+    private static class UpdatingElementConverter
+            implements ElementConverter, DocSerdeRequest> {
+        private static final long serialVersionUID = 1L;
+
+        private final String dataFieldName;
+        private final String index;
+
+        UpdatingElementConverter(String index, String dataFieldName) {
+            this.index = index;
+            this.dataFieldName = dataFieldName;
+        }
+
+        @Override
+        public DocSerdeRequest apply(Tuple2 element, Context context) {
+            Map document = new HashMap<>();
+            document.put(dataFieldName, element.f1);
+
+            final char action = element.f1.charAt(0);
+            final String id = element.f0.toString();
+            switch (action) {
+                case 'c':
+                    return DocSerdeRequest.from(
+                            new IndexRequest(index).create(true).id(id).source(document));
+                case 'd':
+                    return DocSerdeRequest.from(new DeleteRequest(index).id(id));
+                case 'u':
+                    return DocSerdeRequest.from(
+                            new UpdateRequest().index(index).id(id).doc(document));
+                default:
+                    return DocSerdeRequest.from(new IndexRequest(index).id(id).source(document));
+            }
+        }
+    }
+
+    private void awaitForCompletion() throws InterruptedException {
+        lock.lock();
+        try {
+            completed.await();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java
index c85e42b..482e2fe 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.java
@@ -146,29 +146,30 @@ private void runTest(
                         .setAllowInsecure(true)
                         .build();
 
-        final StreamExecutionEnvironment env = new LocalStreamEnvironment();
-        env.enableCheckpointing(100L);
-        if (!allowRestarts) {
-            env.setRestartStrategy(RestartStrategies.noRestart());
-        }
-        DataStream stream = env.fromSequence(1, 5);
+        try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
+            env.enableCheckpointing(100L);
+            if (!allowRestarts) {
+                env.setRestartStrategy(RestartStrategies.noRestart());
+            }
+            DataStream stream = env.fromSequence(1, 5);
 
-        if (additionalMapper != null) {
-            stream = stream.map(additionalMapper);
-        }
+            if (additionalMapper != null) {
+                stream = stream.map(additionalMapper);
+            }
 
-        stream.map(
-                        new MapFunction>() {
-                            @Override
-                            public Tuple2 map(Long value) throws Exception {
-                                return Tuple2.of(
-                                        value.intValue(),
-                                        OpensearchTestClient.buildMessage(value.intValue()));
-                            }
-                        })
-                .sinkTo(sink);
-        env.execute();
-        context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+            stream.map(
+                            new MapFunction>() {
+                                @Override
+                                public Tuple2 map(Long value) throws Exception {
+                                    return Tuple2.of(
+                                            value.intValue(),
+                                            OpensearchTestClient.buildMessage(value.intValue()));
+                                }
+                            })
+                    .sinkTo(sink);
+            env.execute();
+            context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
+        }
     }
 
     private static List>>>
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java
index 322ffc1..b0d42a7 100644
--- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchTestClient.java
@@ -68,6 +68,10 @@ String getDataFieldName() {
         return DATA_FIELD_NAME;
     }
 
+    static String buildMessage(int id, String prefix) {
+        return prefix + buildMessage(id);
+    }
+
     static String buildMessage(int id) {
         return "test-" + id;
     }
diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java
new file mode 100644
index 0000000..9883b03
--- /dev/null
+++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.common.xcontent.XContentFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+class TestConverter implements ElementConverter, DocWriteRequest> {
+
+    private final String index;
+    private final XContentBuilderProvider xContentBuilderProvider;
+    private final String dataFieldName;
+
+    public static TestConverter jsonConverter(String index, String dataFieldName) {
+        return new TestConverter(index, dataFieldName, XContentFactory::jsonBuilder);
+    }
+
+    public static TestConverter smileConverter(String index, String dataFieldName) {
+        return new TestConverter(index, dataFieldName, XContentFactory::smileBuilder);
+    }
+
+    private TestConverter(
+            String index, String dataFieldName, XContentBuilderProvider xContentBuilderProvider) {
+        this.dataFieldName = dataFieldName;
+        this.index = index;
+        this.xContentBuilderProvider = xContentBuilderProvider;
+    }
+
+    @Override
+    public DocWriteRequest apply(Tuple2 element, Context context) {
+        return createIndexRequest(element);
+    }
+
+    public IndexRequest createIndexRequest(Tuple2 element) {
+        Map document = new HashMap<>();
+        document.put(dataFieldName, element.f1);
+        try {
+            return new IndexRequest(index)
+                    .id(element.f0.toString())
+                    .source(xContentBuilderProvider.getBuilder().map(document));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @FunctionalInterface
+    private interface XContentBuilderProvider extends Serializable {
+        XContentBuilder getBuilder() throws IOException;
+    }
+}