Skip to content

Commit abacb47

Browse files
committed
[Flink-33859] Support OpenSearch v2
1 parent bdbaee7 commit abacb47

File tree

21 files changed

+551
-204
lines changed

21 files changed

+551
-204
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
Constructor <org.apache.flink.connector.opensearch.sink.BulkProcessorConfig.<init>(int, int, long, org.apache.flink.connector.opensearch.sink.FlushBackoffType, int, long)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (BulkProcessorConfig.java:44)
22
Constructor <org.apache.flink.connector.opensearch.table.OpensearchConfiguration.<init>(org.apache.flink.configuration.ReadableConfig)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (OpensearchConfiguration.java:61)
3-
Method <org.apache.flink.connector.opensearch.table.IndexGeneratorFactory.createRuntimeIndexGenerator(java.lang.String, [Ljava.lang.String;, [Lorg.apache.flink.table.types.DataType;, org.apache.flink.connector.opensearch.table.IndexGeneratorFactory$IndexHelper, java.time.ZoneId)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (IndexGeneratorFactory.java:0)
3+
Method <org.apache.flink.connector.opensearch.table.IndexGeneratorFactory.createRuntimeIndexGenerator(java.lang.String, [Ljava.lang.String;, [Lorg.apache.flink.table.types.DataType;, org.apache.flink.connector.opensearch.table.IndexGeneratorFactory$IndexHelper, java.time.ZoneId)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (IndexGeneratorFactory.java:0)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.apache.flink.connector.opensearch.sink;
2+
3+
import org.apache.flink.util.FlinkRuntimeException;
4+
5+
class DefaultFailureHandler implements FailureHandler {
6+
7+
@Override
8+
public void onFailure(Throwable failure) {
9+
if (failure instanceof FlinkRuntimeException) {
10+
throw (FlinkRuntimeException) failure;
11+
}
12+
throw new FlinkRuntimeException(failure);
13+
}
14+
}

flink-connector-opensearch/archunit-violations/4382f1f0-807a-45ff-97d8-42f72b6e9484

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not
1515
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
1616
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
1717
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
18-
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
18+
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule

flink-connector-opensearch/archunit-violations/5c4a6228-f9cb-4828-9625-43c57d133967

Lines changed: 33 additions & 33 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.opensearch.sink;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.util.FlinkRuntimeException;
23+
24+
import org.opensearch.action.DocWriteRequest;
25+
import org.opensearch.action.bulk.BulkItemResponse;
26+
import org.opensearch.action.bulk.BulkRequest;
27+
import org.opensearch.action.bulk.BulkResponse;
28+
import org.opensearch.rest.RestStatus;
29+
30+
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
31+
import static org.apache.flink.util.Preconditions.checkNotNull;
32+
33+
/**
34+
* A strict implementation that fails if either the whole bulk request failed or any of its actions.
35+
*/
36+
class DefaultBulkResponseInspector implements BulkResponseInspector {
37+
38+
@VisibleForTesting final FailureHandler failureHandler;
39+
40+
DefaultBulkResponseInspector() {
41+
this(new DefaultFailureHandler());
42+
}
43+
44+
DefaultBulkResponseInspector(FailureHandler failureHandler) {
45+
this.failureHandler = checkNotNull(failureHandler);
46+
}
47+
48+
@Override
49+
public void inspect(BulkRequest request, BulkResponse response) {
50+
if (!response.hasFailures()) {
51+
return;
52+
}
53+
54+
Throwable chainedFailures = null;
55+
for (int i = 0; i < response.getItems().length; i++) {
56+
final BulkItemResponse itemResponse = response.getItems()[i];
57+
if (!itemResponse.isFailed()) {
58+
continue;
59+
}
60+
final Throwable failure = itemResponse.getFailure().getCause();
61+
if (failure == null) {
62+
continue;
63+
}
64+
final RestStatus restStatus = itemResponse.getFailure().getStatus();
65+
final DocWriteRequest<?> actionRequest = request.requests().get(i);
66+
67+
chainedFailures =
68+
firstOrSuppressed(
69+
wrapException(restStatus, failure, actionRequest), chainedFailures);
70+
}
71+
if (chainedFailures == null) {
72+
return;
73+
}
74+
failureHandler.onFailure(chainedFailures);
75+
}
76+
77+
private static Throwable wrapException(
78+
RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) {
79+
if (restStatus == null) {
80+
return new FlinkRuntimeException(
81+
String.format("Single action %s of bulk request failed.", actionRequest),
82+
rootFailure);
83+
} else {
84+
return new FlinkRuntimeException(
85+
String.format(
86+
"Single action %s of bulk request failed with status %s.",
87+
actionRequest, restStatus.getStatus()),
88+
rootFailure);
89+
}
90+
}
91+
}

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.connector.base.DeliveryGuarantee;
2323
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
24-
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
25-
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
2624
import org.apache.flink.util.InstantiationUtil;
2725

2826
import org.apache.http.HttpHost;

flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.action.ActionListener;
3232
import org.opensearch.action.DocWriteRequest;
3333
import org.opensearch.action.bulk.BackoffPolicy;
34-
import org.opensearch.action.bulk.BulkItemResponse;
3534
import org.opensearch.action.bulk.BulkProcessor;
3635
import org.opensearch.action.bulk.BulkRequest;
3736
import org.opensearch.action.bulk.BulkResponse;
@@ -53,7 +52,6 @@
5352
import java.util.List;
5453
import java.util.function.BiConsumer;
5554

56-
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
5755
import static org.apache.flink.util.Preconditions.checkNotNull;
5856

5957
class OpensearchWriter<IN> implements SinkWriter<IN> {
@@ -325,63 +323,6 @@ public void add(UpdateRequest... updateRequests) {
325323
}
326324
}
327325

328-
/**
329-
* A strict implementation that fails if either the whole bulk request failed or any of its
330-
* actions.
331-
*/
332-
static class DefaultBulkResponseInspector implements BulkResponseInspector {
333-
334-
@VisibleForTesting final FailureHandler failureHandler;
335-
336-
DefaultBulkResponseInspector() {
337-
this(new DefaultFailureHandler());
338-
}
339-
340-
DefaultBulkResponseInspector(FailureHandler failureHandler) {
341-
this.failureHandler = checkNotNull(failureHandler);
342-
}
343-
344-
@Override
345-
public void inspect(BulkRequest request, BulkResponse response) {
346-
if (!response.hasFailures()) {
347-
return;
348-
}
349-
350-
Throwable chainedFailures = null;
351-
for (int i = 0; i < response.getItems().length; i++) {
352-
final BulkItemResponse itemResponse = response.getItems()[i];
353-
if (!itemResponse.isFailed()) {
354-
continue;
355-
}
356-
final Throwable failure = itemResponse.getFailure().getCause();
357-
if (failure == null) {
358-
continue;
359-
}
360-
final RestStatus restStatus = itemResponse.getFailure().getStatus();
361-
final DocWriteRequest<?> actionRequest = request.requests().get(i);
362-
363-
chainedFailures =
364-
firstOrSuppressed(
365-
wrapException(restStatus, failure, actionRequest), chainedFailures);
366-
}
367-
if (chainedFailures == null) {
368-
return;
369-
}
370-
failureHandler.onFailure(chainedFailures);
371-
}
372-
}
373-
374-
static class DefaultFailureHandler implements FailureHandler {
375-
376-
@Override
377-
public void onFailure(Throwable failure) {
378-
if (failure instanceof FlinkRuntimeException) {
379-
throw (FlinkRuntimeException) failure;
380-
}
381-
throw new FlinkRuntimeException(failure);
382-
}
383-
}
384-
385326
@Internal
386327
interface BulkRequestConsumerFactory
387328
extends BiConsumer<BulkRequest, ActionListener<BulkResponse>> {}

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.flink.connector.opensearch.sink;
2020

21-
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
2221
import org.apache.flink.util.FlinkRuntimeException;
2322
import org.apache.flink.util.TestLoggerExtension;
2423

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.api.connector.sink2.Sink.InitContext;
2222
import org.apache.flink.connector.base.DeliveryGuarantee;
2323
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
24-
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
2524
import org.apache.flink.metrics.MetricGroup;
2625
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
2726
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.flink.api.connector.sink2.SinkWriter;
2222
import org.apache.flink.api.java.tuple.Tuple2;
2323
import org.apache.flink.connector.opensearch.OpensearchUtil;
24-
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
25-
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
2624
import org.apache.flink.connector.opensearch.test.DockerImageVersions;
2725
import org.apache.flink.metrics.Counter;
2826
import org.apache.flink.metrics.Gauge;
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
org.apache.flink.connector.opensearch.sink.Opensearch2SinkITCase does not satisfy: only one of the following predicates match:\
22
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
3-
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
3+
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
44
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
55
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
66
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
77
org.apache.flink.connector.opensearch.sink.Opensearch2WriterITCase does not satisfy: only one of the following predicates match:\
88
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
9-
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
9+
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
1010
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
1111
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
1212
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
1313
org.apache.flink.connector.opensearch.table.Opensearch2DynamicSinkITCase does not satisfy: only one of the following predicates match:\
1414
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
15-
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
15+
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
1616
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
1717
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
18-
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
18+
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule

0 commit comments

Comments
 (0)