Skip to content

Commit

Permalink
[FLINK-30488] OpenSearch implementation of Async Sink
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Jan 3, 2023
1 parent d0c5059 commit 46b018e
Show file tree
Hide file tree
Showing 15 changed files with 1,609 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSink;
import org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import org.apache.http.HttpHost;
import org.opensearch.action.index.IndexRequest;

import java.util.ArrayList;
import java.util.List;

/** End to end test for OpensearchAsyncSink. */
public class OpensearchAsyncSinkExample {

public static void main(String[] args) throws Exception {

final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
System.out.println(
"Missing parameters!\n" + "Usage: --numRecords <numRecords> --index <index>");
return;
}

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

DataStream<Tuple2<String, String>> source =
env.fromSequence(0, parameterTool.getInt("numRecords") - 1)
.flatMap(
new FlatMapFunction<Long, Tuple2<String, String>>() {
@Override
public void flatMap(
Long value, Collector<Tuple2<String, String>> out) {
final String key = String.valueOf(value);
final String message = "message #" + value;
out.collect(Tuple2.of(key, message + "update #1"));
out.collect(Tuple2.of(key, message + "update #2"));
}
});

List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

OpensearchAsyncSinkBuilder<Tuple2<String, String>> osSinkBuilder =
OpensearchAsyncSink.<Tuple2<String, String>>builder()
.setHosts(new HttpHost("localhost:9200"))
.setElementConverter(
(element, context) ->
new IndexRequest("my-index")
.id(element.f0.toString())
.source(element.f1));

source.sinkTo(osSinkBuilder.build());

env.execute("Opensearch end to end async sink test example");
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
org.apache.flink.connector.opensearch.sink.OpensearchAsyncSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.opensearch.sink.OpensearchAsyncWriterITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
Expand Down
8 changes: 8 additions & 0 deletions flink-connector-opensearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Opensearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.annotation.PublicEvolving;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;

/**
* Wrapper class around {@link DocWriteRequest} since it does not implement {@link Serializable},
* required by AsyncSink scaffolding.
*
* @param <T> type of the write request
*/
@PublicEvolving
public class DocSerdeRequest<T> implements Serializable {
private static final long serialVersionUID = 1L;
private final DocWriteRequest<T> request;

private DocSerdeRequest(DocWriteRequest<T> request) {
this.request = request;
}

public DocWriteRequest<T> getRequest() {
return request;
}

static <T> DocSerdeRequest<T> from(DocWriteRequest<T> request) {
return new DocSerdeRequest<>(request);
}

static DocSerdeRequest<?> readFrom(long requestSize, DataInputStream in) throws IOException {
try (final StreamInput stream = new InputStreamStreamInput(in, requestSize)) {
return new DocSerdeRequest<>(readDocumentRequest(stream));
}
}

void writeTo(DataOutputStream out) throws IOException {
try (BytesStreamOutput stream = new BytesStreamOutput()) {
writeDocumentRequest(stream, request);
out.write(BytesReference.toBytes(stream.bytes()));
}
}

/** Read a document write (index/delete/update) request. */
private static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException {
byte type = in.readByte();
DocWriteRequest<?> docWriteRequest;
if (type == 0) {
docWriteRequest = new IndexRequest(in);
} else if (type == 1) {
docWriteRequest = new DeleteRequest(in);
} else if (type == 2) {
docWriteRequest = new UpdateRequest(in);
} else {
throw new IllegalStateException("Invalid request type [" + type + " ]");
}
return docWriteRequest;
}

/** Write a document write (index/delete/update) request. */
private static void writeDocumentRequest(StreamOutput out, DocWriteRequest<?> request)
throws IOException {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
((IndexRequest) request).writeTo(out);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
((DeleteRequest) request).writeTo(out);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
((UpdateRequest) request).writeTo(out);
} else {
throw new IllegalStateException(
"Invalid request [" + request.getClass().getSimpleName() + " ]");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import org.apache.http.HttpHost;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Apache Flink's Async Sink to insert or update data in an Opensearch index (see please {@link
* OpensearchAsyncWriter}).
*
* @param <InputT> type of the records converted to Opensearch actions (instances of {@link
* DocSerdeRequest})
* @see OpensearchAsyncSinkBuilder on how to construct a OpensearchAsyncSink
*/
@PublicEvolving
public class OpensearchAsyncSink<InputT> extends AsyncSinkBase<InputT, DocSerdeRequest<?>> {
private static final long serialVersionUID = 1L;

private final List<HttpHost> hosts;
private final NetworkClientConfig networkClientConfig;

/**
* Constructor creating an Opensearch async sink.
*
* @param maxBatchSize the maximum size of a batch of entries that may be sent
* @param maxInFlightRequests he maximum number of in flight requests that may exist, if any
* more in flight requests need to be initiated once the maximum has been reached, then it
* will be blocked until some have completed
* @param maxBufferedRequests the maximum number of elements held in the buffer, requests to add
* elements will be blocked while the number of elements in the buffer is at the maximum
* @param maxBatchSizeInBytes the maximum size of a batch of entries that may be sent to KDS
* measured in bytes
* @param maxTimeInBufferMS the maximum amount of time an entry is allowed to live in the
* buffer, if any element reaches this age, the entire buffer will be flushed immediately
* @param maxRecordSizeInBytes the maximum size of a record the sink will accept into the
* buffer, a record of size larger than this will be rejected when passed to the sink
* @param elementConverter converting incoming records to Opensearch write document requests
* @param hosts the reachable Opensearch cluster nodes
* @param networkClientConfig describing properties of the network connection used to connect to
* the Opensearch cluster
*/
OpensearchAsyncSink(
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
ElementConverter<InputT, DocSerdeRequest<?>> elementConverter,
List<HttpHost> hosts,
NetworkClientConfig networkClientConfig) {
super(
elementConverter,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes);
this.hosts = checkNotNull(hosts);
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
this.networkClientConfig = checkNotNull(networkClientConfig);
}

/**
* Create a {@link OpensearchAsyncSinkBuilder} to construct a new {@link OpensearchAsyncSink}.
*
* @param <InputT> type of incoming records
* @return {@link OpensearchAsyncSinkBuilder}
*/
public static <InputT> OpensearchAsyncSinkBuilder<InputT> builder() {
return new OpensearchAsyncSinkBuilder<>();
}

@Internal
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<DocSerdeRequest<?>>> createWriter(
InitContext context) throws IOException {
return new OpensearchAsyncWriter<>(
context,
getElementConverter(),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
hosts,
networkClientConfig,
Collections.emptyList());
}

@Internal
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<DocSerdeRequest<?>>> restoreWriter(
InitContext context,
Collection<BufferedRequestState<DocSerdeRequest<?>>> recoveredState)
throws IOException {
return new OpensearchAsyncWriter<>(
context,
getElementConverter(),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
hosts,
networkClientConfig,
recoveredState);
}

@Internal
@Override
public SimpleVersionedSerializer<BufferedRequestState<DocSerdeRequest<?>>>
getWriterStateSerializer() {
return new OpensearchWriterStateSerializer();
}
}
Loading

0 comments on commit 46b018e

Please sign in to comment.