Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33925][connectors/opensearch] Allow customising bulk failure handling #39

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.function.SerializableFunction;

import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;

/** Callback for inspecting a {@link BulkResponse}. */
@PublicEvolving
@FunctionalInterface
public interface BulkResponseInspector {

/**
* Callback to inspect a {@code response} in the context of its {@code request}. It may throw a
* {@link org.apache.flink.util.FlinkRuntimeException} to indicate that the bulk failed
* (partially).
*/
void inspect(BulkRequest request, BulkResponse response);

/**
* Factory interface for creating a {@link BulkResponseInspector} in the context of a sink.
* Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to capture custom metrics.
*/
@PublicEvolving
@FunctionalInterface
interface BulkResponseInspectorFactory
extends SerializableFunction<
BulkResponseInspectorFactory.InitContext, BulkResponseInspector> {

/**
* The interface exposes a subset of {@link
* org.apache.flink.api.connector.sink2.Sink.InitContext}.
*/
interface InitContext {

/** Returns: The metric group of the surrounding writer. */
MetricGroup metricGroup();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;

import org.apache.http.HttpHost;

Expand Down Expand Up @@ -60,7 +61,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
private final NetworkClientConfig networkClientConfig;
private final DeliveryGuarantee deliveryGuarantee;
private final RestClientFactory restClientFactory;
private final FailureHandler failureHandler;
private final BulkResponseInspectorFactory bulkResponseInspectorFactory;

OpensearchSink(
List<HttpHost> hosts,
Expand All @@ -69,15 +70,15 @@ public class OpensearchSink<IN> implements Sink<IN> {
BulkProcessorConfig buildBulkProcessorConfig,
NetworkClientConfig networkClientConfig,
RestClientFactory restClientFactory,
FailureHandler failureHandler) {
BulkResponseInspectorFactory bulkResponseInspectorFactory) {
this.hosts = checkNotNull(hosts);
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
this.emitter = checkNotNull(emitter);
this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig);
this.networkClientConfig = checkNotNull(networkClientConfig);
this.restClientFactory = checkNotNull(restClientFactory);
this.failureHandler = checkNotNull(failureHandler);
this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory);
}

@Override
Expand All @@ -91,11 +92,16 @@ public SinkWriter<IN> createWriter(InitContext context) throws IOException {
context.metricGroup(),
context.getMailboxExecutor(),
restClientFactory,
failureHandler);
bulkResponseInspectorFactory.apply(context::metricGroup));
}

@VisibleForTesting
DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}

@VisibleForTesting
BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
return bulkResponseInspectorFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
import org.apache.flink.util.InstantiationUtil;

import org.apache.http.HttpHost;

import java.util.Arrays;
import java.util.List;

import static org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -74,7 +76,8 @@ public class OpensearchSinkBuilder<IN> {
private Integer socketTimeout;
private Boolean allowInsecure;
private RestClientFactory restClientFactory;
private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER;
private FailureHandler failureHandler = new DefaultFailureHandler();
private BulkResponseInspectorFactory bulkResponseInspectorFactory;

public OpensearchSinkBuilder() {
restClientFactory = new DefaultRestClientFactory();
Expand Down Expand Up @@ -315,6 +318,20 @@ public OpensearchSinkBuilder<IN> setFailureHandler(FailureHandler failureHandler
return self();
}

/**
* Overrides the default {@link BulkResponseInspectorFactory}. A custom {@link
* BulkResponseInspector}, for example, can change the failure handling and capture additional
* metrics. See {@link #failureHandler} for a simpler way of handling failures.
*
* @param bulkResponseInspectorFactory the factory
* @return this builder
*/
public OpensearchSinkBuilder<IN> setBulkResponseInspectorFactory(
BulkResponseInspectorFactory bulkResponseInspectorFactory) {
this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory);
return self();
}

/**
* Constructs the {@link OpensearchSink} with the properties configured this builder.
*
Expand All @@ -334,7 +351,13 @@ public OpensearchSink<IN> build() {
bulkProcessorConfig,
networkClientConfig,
restClientFactory,
failureHandler);
getBulkResponseInspectorFactory());
}

protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
return this.bulkResponseInspectorFactory == null
? new DefaultBulkResponseInspectorFactory(failureHandler)
: this.bulkResponseInspectorFactory;
}

private NetworkClientConfig buildNetworkClientConfig() {
Expand Down Expand Up @@ -395,4 +418,23 @@ public String toString() {
+ '\''
+ '}';
}

/**
* Default factory for {@link FailureHandler}-bound {@link BulkResponseInspector
* BulkResponseInspectors}. A Static class is used instead of anonymous/lambda to avoid
* non-serializable references to {@link OpensearchSinkBuilder}.
*/
static class DefaultBulkResponseInspectorFactory implements BulkResponseInspectorFactory {

private final FailureHandler failureHandler;

DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) {
this.failureHandler = failureHandler;
}

@Override
public BulkResponseInspector apply(InitContext context) {
return new DefaultBulkResponseInspector(failureHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,13 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchWriter.class);

public static final FailureHandler DEFAULT_FAILURE_HANDLER =
ex -> {
throw new FlinkRuntimeException(ex);
};

private final OpensearchEmitter<? super IN> emitter;
private final MailboxExecutor mailboxExecutor;
private final boolean flushOnCheckpoint;
private final BulkProcessor bulkProcessor;
private final RestHighLevelClient client;
private final RequestIndexer requestIndexer;
private final Counter numBytesOutCounter;
private final FailureHandler failureHandler;

private long pendingActions = 0;
private boolean checkpointInProgress = false;
Expand Down Expand Up @@ -102,7 +96,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
SinkWriterMetricGroup metricGroup,
MailboxExecutor mailboxExecutor,
RestClientFactory restClientFactory,
FailureHandler failureHandler) {
BulkResponseInspector bulkResponseInspector) {
this.emitter = checkNotNull(emitter);
this.flushOnCheckpoint = flushOnCheckpoint;
this.mailboxExecutor = checkNotNull(mailboxExecutor);
Expand All @@ -113,7 +107,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
builder, new DefaultRestClientConfig(networkClientConfig));

this.client = new RestHighLevelClient(builder);
this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
this.bulkProcessor =
createBulkProcessor(bulkProcessorConfig, checkNotNull(bulkResponseInspector));
this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
checkNotNull(metricGroup);
metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
Expand All @@ -123,7 +118,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to open the OpensearchEmitter", e);
}
this.failureHandler = failureHandler;
}

@Override
Expand Down Expand Up @@ -163,7 +157,8 @@ public void close() throws Exception {
client.close();
}

private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) {
private BulkProcessor createBulkProcessor(
BulkProcessorConfig bulkProcessorConfig, BulkResponseInspector bulkResponseInspector) {

final BulkProcessor.Builder builder =
BulkProcessor.builder(
Expand All @@ -180,7 +175,7 @@ public void accept(
bulkResponseActionListener);
}
},
new BulkListener());
new BulkListener(bulkResponseInspector));

if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
Expand Down Expand Up @@ -223,6 +218,12 @@ public void accept(

private class BulkListener implements BulkProcessor.Listener {

private final BulkResponseInspector bulkResponseInspector;

public BulkListener(BulkResponseInspector bulkResponseInspector) {
this.bulkResponseInspector = bulkResponseInspector;
}

@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.info("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
Expand All @@ -245,6 +246,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
},
"opensearchErrorCallback");
}

private void extractFailures(BulkRequest request, BulkResponse response) {
bulkResponseInspector.inspect(request, response);
pendingActions -= request.numberOfActions();
}
}

private void enqueueActionInMailbox(
Expand All @@ -259,35 +265,6 @@ private void enqueueActionInMailbox(
mailboxExecutor.execute(action, actionName);
}

private void extractFailures(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
pendingActions -= request.numberOfActions();
return;
}

Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);

chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}

private static Throwable wrapException(
RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) {
if (restStatus == null) {
Expand Down Expand Up @@ -345,4 +322,61 @@ public void add(UpdateRequest... updateRequests) {
}
}
}

/**
* A strict implementation that fails if either the whole bulk request failed or any of its
* actions.
*/
static class DefaultBulkResponseInspector implements BulkResponseInspector {

@VisibleForTesting final FailureHandler failureHandler;

DefaultBulkResponseInspector() {
this(new DefaultFailureHandler());
}

DefaultBulkResponseInspector(FailureHandler failureHandler) {
this.failureHandler = checkNotNull(failureHandler);
}

@Override
public void inspect(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
return;
}

Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);

chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}
}

static class DefaultFailureHandler implements FailureHandler {

@Override
public void onFailure(Throwable failure) {
if (failure instanceof FlinkRuntimeException) {
throw (FlinkRuntimeException) failure;
}
throw new FlinkRuntimeException(failure);
}
}
}
Loading
Loading