From 9e161cc097b34d3ea0d32787f337e630250547d3 Mon Sep 17 00:00:00 2001 From: Peter Fischer Date: Wed, 13 Mar 2024 08:24:15 +0100 Subject: [PATCH] [FLINK-33925][connectors/opensearch] Allow customising bulk failure handling Extracted `BulkResponseInspector` interface to allow custom handling of (partially) failed bulk requests. If not overridden, default behaviour remains unchanged and partial failures are escalated. * fixes https://issues.apache.org/jira/browse/FLINK-33925 * allows custom metrics to be exposed --- .../sink/BulkResponseInspector.java | 60 ++++++++ .../opensearch/sink/OpensearchSink.java | 14 +- .../sink/OpensearchSinkBuilder.java | 48 ++++++- .../opensearch/sink/OpensearchWriter.java | 114 +++++++++------ .../sink/DefaultBulkInspectorTest.java | 133 ++++++++++++++++++ .../sink/OpensearchSinkBuilderTest.java | 131 +++++++++++++++++ .../sink/OpensearchWriterITCase.java | 9 +- 7 files changed, 458 insertions(+), 51 deletions(-) create mode 100644 flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java new file mode 100644 index 0000000..3dbd6a8 --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SerializableFunction; + +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; + +/** Callback for inspecting a {@link BulkResponse}. */ +@PublicEvolving +@FunctionalInterface +public interface BulkResponseInspector { + + /** + * Callback to inspect a {@code response} in the context of its {@code request}. It may throw a + * {@link org.apache.flink.util.FlinkRuntimeException} to indicate that the bulk failed + * (partially). + */ + void inspect(BulkRequest request, BulkResponse response); + + /** + * Factory interface for creating a {@link BulkResponseInspector} in the context of a sink. + * Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to capture custom metrics. + */ + @PublicEvolving + @FunctionalInterface + interface BulkResponseInspectorFactory + extends SerializableFunction< + BulkResponseInspectorFactory.InitContext, BulkResponseInspector> { + + /** + * The interface exposes a subset of {@link + * org.apache.flink.api.connector.sink2.Sink.InitContext}. + */ + interface InitContext { + + /** Returns: The metric group of the surrounding writer. */ + MetricGroup metricGroup(); + } + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java index c02b4fe..c01f0d0 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; import org.apache.http.HttpHost; @@ -60,7 +61,7 @@ public class OpensearchSink implements Sink { private final NetworkClientConfig networkClientConfig; private final DeliveryGuarantee deliveryGuarantee; private final RestClientFactory restClientFactory; - private final FailureHandler failureHandler; + private final BulkResponseInspectorFactory bulkResponseInspectorFactory; OpensearchSink( List hosts, @@ -69,7 +70,7 @@ public class OpensearchSink implements Sink { BulkProcessorConfig buildBulkProcessorConfig, NetworkClientConfig networkClientConfig, RestClientFactory restClientFactory, - FailureHandler failureHandler) { + BulkResponseInspectorFactory bulkResponseInspectorFactory) { this.hosts = checkNotNull(hosts); checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); this.emitter = checkNotNull(emitter); @@ -77,7 +78,7 @@ public class OpensearchSink implements Sink { this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig); this.networkClientConfig = checkNotNull(networkClientConfig); this.restClientFactory = checkNotNull(restClientFactory); - this.failureHandler = checkNotNull(failureHandler); + this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory); } @Override @@ -91,11 +92,16 @@ public SinkWriter createWriter(InitContext context) throws IOException { context.metricGroup(), context.getMailboxExecutor(), restClientFactory, - failureHandler); + bulkResponseInspectorFactory.apply(context::metricGroup)); } @VisibleForTesting DeliveryGuarantee getDeliveryGuarantee() { return deliveryGuarantee; } + + @VisibleForTesting + BulkResponseInspectorFactory getBulkResponseInspectorFactory() { + return bulkResponseInspectorFactory; + } } diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java index 736c607..ce7b8ae 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java @@ -20,6 +20,9 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; +import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler; import org.apache.flink.util.InstantiationUtil; import org.apache.http.HttpHost; @@ -27,7 +30,6 @@ import java.util.Arrays; import java.util.List; -import static org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -74,7 +76,8 @@ public class OpensearchSinkBuilder { private Integer socketTimeout; private Boolean allowInsecure; private RestClientFactory restClientFactory; - private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER; + private FailureHandler failureHandler = new DefaultFailureHandler(); + private BulkResponseInspectorFactory bulkResponseInspectorFactory; public OpensearchSinkBuilder() { restClientFactory = new DefaultRestClientFactory(); @@ -315,6 +318,20 @@ public OpensearchSinkBuilder setFailureHandler(FailureHandler failureHandler return self(); } + /** + * Overrides the default {@link BulkResponseInspectorFactory}. A custom {@link + * BulkResponseInspector}, for example, can change the failure handling and capture additional + * metrics. See {@link #failureHandler} for a simpler way of handling failures. + * + * @param bulkResponseInspectorFactory the factory + * @return this builder + */ + public OpensearchSinkBuilder setBulkResponseInspectorFactory( + BulkResponseInspectorFactory bulkResponseInspectorFactory) { + this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory); + return self(); + } + /** * Constructs the {@link OpensearchSink} with the properties configured this builder. * @@ -334,7 +351,13 @@ public OpensearchSink build() { bulkProcessorConfig, networkClientConfig, restClientFactory, - failureHandler); + getBulkResponseInspectorFactory()); + } + + protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() { + return this.bulkResponseInspectorFactory == null + ? new DefaultBulkResponseInspectorFactory(failureHandler) + : this.bulkResponseInspectorFactory; } private NetworkClientConfig buildNetworkClientConfig() { @@ -395,4 +418,23 @@ public String toString() { + '\'' + '}'; } + + /** + * Default factory for {@link FailureHandler}-bound {@link BulkResponseInspector + * BulkResponseInspectors}. A Static class is used instead of anonymous/lambda to avoid + * non-serializable references to {@link OpensearchSinkBuilder}. + */ + static class DefaultBulkResponseInspectorFactory implements BulkResponseInspectorFactory { + + private final FailureHandler failureHandler; + + DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) { + this.failureHandler = failureHandler; + } + + @Override + public BulkResponseInspector apply(InitContext context) { + return new DefaultBulkResponseInspector(failureHandler); + } + } } diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java index 68da301..d13973d 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java @@ -58,11 +58,6 @@ class OpensearchWriter implements SinkWriter { private static final Logger LOG = LoggerFactory.getLogger(OpensearchWriter.class); - public static final FailureHandler DEFAULT_FAILURE_HANDLER = - ex -> { - throw new FlinkRuntimeException(ex); - }; - private final OpensearchEmitter emitter; private final MailboxExecutor mailboxExecutor; private final boolean flushOnCheckpoint; @@ -70,7 +65,6 @@ class OpensearchWriter implements SinkWriter { private final RestHighLevelClient client; private final RequestIndexer requestIndexer; private final Counter numBytesOutCounter; - private final FailureHandler failureHandler; private long pendingActions = 0; private boolean checkpointInProgress = false; @@ -102,7 +96,7 @@ class OpensearchWriter implements SinkWriter { SinkWriterMetricGroup metricGroup, MailboxExecutor mailboxExecutor, RestClientFactory restClientFactory, - FailureHandler failureHandler) { + BulkResponseInspector bulkResponseInspector) { this.emitter = checkNotNull(emitter); this.flushOnCheckpoint = flushOnCheckpoint; this.mailboxExecutor = checkNotNull(mailboxExecutor); @@ -113,7 +107,8 @@ class OpensearchWriter implements SinkWriter { builder, new DefaultRestClientConfig(networkClientConfig)); this.client = new RestHighLevelClient(builder); - this.bulkProcessor = createBulkProcessor(bulkProcessorConfig); + this.bulkProcessor = + createBulkProcessor(bulkProcessorConfig, checkNotNull(bulkResponseInspector)); this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter()); checkNotNull(metricGroup); metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime); @@ -123,7 +118,6 @@ class OpensearchWriter implements SinkWriter { } catch (Exception e) { throw new FlinkRuntimeException("Failed to open the OpensearchEmitter", e); } - this.failureHandler = failureHandler; } @Override @@ -163,7 +157,8 @@ public void close() throws Exception { client.close(); } - private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) { + private BulkProcessor createBulkProcessor( + BulkProcessorConfig bulkProcessorConfig, BulkResponseInspector bulkResponseInspector) { final BulkProcessor.Builder builder = BulkProcessor.builder( @@ -180,7 +175,7 @@ public void accept( bulkResponseActionListener); } }, - new BulkListener()); + new BulkListener(bulkResponseInspector)); if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) { builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions()); @@ -223,6 +218,12 @@ public void accept( private class BulkListener implements BulkProcessor.Listener { + private final BulkResponseInspector bulkResponseInspector; + + public BulkListener(BulkResponseInspector bulkResponseInspector) { + this.bulkResponseInspector = bulkResponseInspector; + } + @Override public void beforeBulk(long executionId, BulkRequest request) { LOG.info("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); @@ -245,6 +246,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) }, "opensearchErrorCallback"); } + + private void extractFailures(BulkRequest request, BulkResponse response) { + bulkResponseInspector.inspect(request, response); + pendingActions -= request.numberOfActions(); + } } private void enqueueActionInMailbox( @@ -259,35 +265,6 @@ private void enqueueActionInMailbox( mailboxExecutor.execute(action, actionName); } - private void extractFailures(BulkRequest request, BulkResponse response) { - if (!response.hasFailures()) { - pendingActions -= request.numberOfActions(); - return; - } - - Throwable chainedFailures = null; - for (int i = 0; i < response.getItems().length; i++) { - final BulkItemResponse itemResponse = response.getItems()[i]; - if (!itemResponse.isFailed()) { - continue; - } - final Throwable failure = itemResponse.getFailure().getCause(); - if (failure == null) { - continue; - } - final RestStatus restStatus = itemResponse.getFailure().getStatus(); - final DocWriteRequest actionRequest = request.requests().get(i); - - chainedFailures = - firstOrSuppressed( - wrapException(restStatus, failure, actionRequest), chainedFailures); - } - if (chainedFailures == null) { - return; - } - failureHandler.onFailure(chainedFailures); - } - private static Throwable wrapException( RestStatus restStatus, Throwable rootFailure, DocWriteRequest actionRequest) { if (restStatus == null) { @@ -345,4 +322,61 @@ public void add(UpdateRequest... updateRequests) { } } } + + /** + * A strict implementation that fails if either the whole bulk request failed or any of its + * actions. + */ + static class DefaultBulkResponseInspector implements BulkResponseInspector { + + @VisibleForTesting final FailureHandler failureHandler; + + DefaultBulkResponseInspector() { + this(new DefaultFailureHandler()); + } + + DefaultBulkResponseInspector(FailureHandler failureHandler) { + this.failureHandler = checkNotNull(failureHandler); + } + + @Override + public void inspect(BulkRequest request, BulkResponse response) { + if (!response.hasFailures()) { + return; + } + + Throwable chainedFailures = null; + for (int i = 0; i < response.getItems().length; i++) { + final BulkItemResponse itemResponse = response.getItems()[i]; + if (!itemResponse.isFailed()) { + continue; + } + final Throwable failure = itemResponse.getFailure().getCause(); + if (failure == null) { + continue; + } + final RestStatus restStatus = itemResponse.getFailure().getStatus(); + final DocWriteRequest actionRequest = request.requests().get(i); + + chainedFailures = + firstOrSuppressed( + wrapException(restStatus, failure, actionRequest), chainedFailures); + } + if (chainedFailures == null) { + return; + } + failureHandler.onFailure(chainedFailures); + } + } + + static class DefaultFailureHandler implements FailureHandler { + + @Override + public void onFailure(Throwable failure) { + if (failure instanceof FlinkRuntimeException) { + throw (FlinkRuntimeException) failure; + } + throw new FlinkRuntimeException(failure); + } + } } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java new file mode 100644 index 0000000..e230da7 --- /dev/null +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.TestLoggerExtension; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; +import org.opensearch.action.DocWriteRequest.OpType; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkItemResponse.Failure; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; + +import java.io.IOException; + +/** Tests for {@link DefaultBulkResponseInspector}. */ +@ExtendWith(TestLoggerExtension.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class DefaultBulkResponseInspectorTest { + + @Test + void testPassWithoutFailures() { + final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); + Assertions.assertThatCode( + () -> + inspector.inspect( + new BulkRequest(), + new BulkResponse(new BulkItemResponse[] {}, 0))) + .doesNotThrowAnyException(); + } + + @Test + void testPassesDespiteChainedFailure() { + final DefaultBulkResponseInspector inspector = + new DefaultBulkResponseInspector((failure) -> {}); + Assertions.assertThatCode( + () -> { + final BulkRequest request = new BulkRequest(); + request.add( + new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + + inspector.inspect( + request, + new BulkResponse( + new BulkItemResponse[] { + new BulkItemResponse( + 0, OpType.CREATE, (DocWriteResponse) null), + new BulkItemResponse( + 1, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + new IOException("A"))), + new BulkItemResponse( + 2, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + new IOException("B"))) + }, + 0)); + }) + .doesNotThrowAnyException(); + } + + @Test + void testThrowsChainedFailure() { + final IOException failureCause0 = new IOException("A"); + final IOException failureCause1 = new IOException("B"); + final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy( + () -> { + final BulkRequest request = new BulkRequest(); + request.add( + new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + + inspector.inspect( + request, + new BulkResponse( + new BulkItemResponse[] { + new BulkItemResponse( + 0, OpType.CREATE, (DocWriteResponse) null), + new BulkItemResponse( + 1, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + failureCause0)), + new BulkItemResponse( + 2, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + failureCause1)) + }, + 0)); + }) + .withCause(failureCause0); + } +} diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java index 9939313..693ae44 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java @@ -17,8 +17,26 @@ package org.apache.flink.connector.opensearch.sink; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; +import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.SimpleUserCodeClassLoader; import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; import org.junit.jupiter.api.DynamicTest; @@ -27,9 +45,12 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -111,6 +132,116 @@ void testThrowIfConnectionPathPrefixNotSet() { .isInstanceOf(NullPointerException.class); } + @Test + void testOverrideFailureHandler() { + final FailureHandler failureHandler = (failure) -> {}; + final OpensearchSink sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + + final InitContext sinkInitContext = new MockInitContext(); + final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); + assertThat(bulkResponseInspector) + .isInstanceOf(DefaultBulkResponseInspector.class) + .extracting( + (inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) + .isEqualTo(failureHandler); + } + + @Test + void testOverrideBulkResponseInspectorFactory() { + final AtomicBoolean called = new AtomicBoolean(); + final BulkResponseInspectorFactory bulkResponseInspectorFactory = + initContext -> { + final MetricGroup metricGroup = initContext.metricGroup(); + metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); + called.set(true); + return (BulkResponseInspector) (request, response) -> {}; + }; + final OpensearchSink sink = + createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) + .build(); + + final InitContext sinkInitContext = new MockInitContext(); + + assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); + assertThat(called).isTrue(); + } + + private static class DummyMailboxExecutor implements MailboxExecutor { + private DummyMailboxExecutor() {} + + public void execute( + ThrowingRunnable command, + String descriptionFormat, + Object... descriptionArgs) {} + + public void yield() throws InterruptedException, FlinkRuntimeException {} + + public boolean tryYield() throws FlinkRuntimeException { + return false; + } + } + + private static class MockInitContext + implements Sink.InitContext, SerializationSchema.InitializationContext { + + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create( + OpensearchSinkBuilderTest.class.getClassLoader()); + } + + public MailboxExecutor getMailboxExecutor() { + return new OpensearchSinkBuilderTest.DummyMailboxExecutor(); + } + + public ProcessingTimeService getProcessingTimeService() { + return new TestProcessingTimeService(); + } + + public int getSubtaskId() { + return 0; + } + + public int getNumberOfParallelSubtasks() { + return 0; + } + + public int getAttemptNumber() { + return 0; + } + + public SinkWriterMetricGroup metricGroup() { + return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup()); + } + + public MetricGroup getMetricGroup() { + return this.metricGroup(); + } + + public OptionalLong getRestoredCheckpointId() { + return OptionalLong.empty(); + } + + public SerializationSchema.InitializationContext + asSerializationSchemaInitializationContext() { + return this; + } + + public boolean isObjectReuseEnabled() { + return false; + } + + public TypeSerializer createInputSerializer() { + throw new UnsupportedOperationException(); + } + + public JobID getJobId() { + throw new UnsupportedOperationException(); + } + } + private OpensearchSinkBuilder createEmptyBuilder() { return new OpensearchSinkBuilder<>(); } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java index afdf26b..fc083a4 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java @@ -21,6 +21,8 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.opensearch.OpensearchUtil; +import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler; import org.apache.flink.connector.opensearch.test.DockerImageVersions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -56,7 +58,6 @@ import java.util.Optional; import static org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage; -import static org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link OpensearchWriter}. */ @@ -280,7 +281,7 @@ private OpensearchWriter> createWriter( flushOnCheckpoint, bulkProcessorConfig, InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), - DEFAULT_FAILURE_HANDLER); + new DefaultFailureHandler()); } private OpensearchWriter> createWriter( @@ -306,7 +307,7 @@ private OpensearchWriter> createWriter( flushOnCheckpoint, bulkProcessorConfig, metricGroup, - DEFAULT_FAILURE_HANDLER); + new DefaultFailureHandler()); } private OpensearchWriter> createWriter( @@ -331,7 +332,7 @@ private OpensearchWriter> createWriter( metricGroup, new TestMailbox(), new DefaultRestClientFactory(), - failureHandler); + new DefaultBulkResponseInspector(failureHandler)); } private static class UpdatingEmitter implements OpensearchEmitter> {