Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Add error handling strategy to pull-based ingestion #17427

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public KafkaOffset nextPointer() {
return new KafkaOffset(lastFetchedOffset + 1);
}

@Override
public KafkaOffset nextPointer(KafkaOffset pointer) {
return new KafkaOffset(pointer.getOffset() + 1);
}

@Override
public IngestionShardPointer earliestPointer() {
long startOffset = AccessController.doPrivileged(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -770,6 +771,30 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error.strategy";
public static final Setting<String> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = Setting.simpleString(
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
new Setting.Validator<>() {

@Override
public void validate(final String value) {
try {
IngestionErrorStrategy.ErrorStrategy.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid value for " + SETTING_INGESTION_SOURCE_ERROR_STRATEGY + " [" + value + "]");
}
}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
validate(value);
}
},
Property.IndexScope,
Property.Dynamic
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -1004,8 +1029,13 @@ public IngestionSource getIngestionSource() {
pointerInitResetType,
pointerInitResetValue
);

final String errorStrategyString = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
IngestionErrorStrategy.ErrorStrategy errorStrategy = IngestionErrorStrategy.ErrorStrategy.valueOf(
errorStrategyString.toUpperCase(Locale.ROOT)
);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.cluster.metadata;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;

import java.util.Map;
Expand All @@ -21,12 +22,19 @@
public class IngestionSource {
private String type;
private PointerInitReset pointerInitReset;
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
private Map<String, Object> params;

public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
public IngestionSource(
String type,
PointerInitReset pointerInitReset,
IngestionErrorStrategy.ErrorStrategy errorStrategy,
Map<String, Object> params
) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
this.errorStrategy = errorStrategy;
}

public String getType() {
Expand All @@ -37,6 +45,10 @@ public PointerInitReset getPointerInitReset() {
return pointerInitReset;
}

public IngestionErrorStrategy.ErrorStrategy getErrorStrategy() {
return errorStrategy;
}

public Map<String, Object> params() {
return params;
}
Expand All @@ -48,17 +60,30 @@ public boolean equals(Object o) {
IngestionSource ingestionSource = (IngestionSource) o;
return Objects.equals(type, ingestionSource.type)
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
&& Objects.equals(params, ingestionSource.params);
}

@Override
public int hashCode() {
return Objects.hash(type, pointerInitReset, params);
return Objects.hash(type, pointerInitReset, params, errorStrategy);
}

@Override
public String toString() {
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
return "IngestionSource{"
+ "type='"
+ type
+ '\''
+ ",pointer_init_reset='"
+ pointerInitReset
+ '\''
+ ",error_strategy='"
+ errorStrategy
+ '\''
+ ", params="
+ params
+ '}';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public M getMessage() {
*/
T nextPointer();

/**
* @return the immediate next pointer from the provided start pointer
*/
T nextPointer(T startPointer);

/**
* @return the earliest pointer in the shard
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -189,7 +190,20 @@ public void start() {
}

String resetValue = ingestionSource.getPointerInitReset().getValue();
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue);
IngestionErrorStrategy ingestionErrorStrategy = IngestionErrorStrategy.create(
ingestionSource.getErrorStrategy(),
ingestionSource.getType()
);

streamPoller = new DefaultStreamPoller(
startPointer,
persistedPointers,
ingestionShardConsumer,
this,
resetState,
resetValue,
ingestionErrorStrategy
);
streamPoller.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This error handling strategy blocks on failures preventing processing of remaining updates in the ingestion source.
*/
public class BlockIngestionErrorStrategy implements IngestionErrorStrategy {
private static final Logger logger = LogManager.getLogger(BlockIngestionErrorStrategy.class);
private final String ingestionSource;

public BlockIngestionErrorStrategy(String ingestionSource) {
this.ingestionSource = ingestionSource;
}

@Override
public void handleError(Throwable e, ErrorStage stage) {
logger.error("Error processing update from {}: {}", ingestionSource, e);

// todo: record blocking update and emit metrics
}

@Override
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,25 @@ public class DefaultStreamPoller implements StreamPoller {
@Nullable
private IngestionShardPointer maxPersistedPointer;

private IngestionErrorStrategy errorStrategy;

public DefaultStreamPoller(
IngestionShardPointer startPointer,
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
IngestionEngine ingestionEngine,
ResetState resetState,
String resetValue
String resetValue,
IngestionErrorStrategy errorStrategy
) {
this(
startPointer,
persistedPointers,
consumer,
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
resetState,
resetValue
resetValue,
errorStrategy
);
}

Expand All @@ -88,7 +92,8 @@ public DefaultStreamPoller(
IngestionShardConsumer consumer,
MessageProcessorRunnable processorRunnable,
ResetState resetState,
String resetValue
String resetValue,
IngestionErrorStrategy errorStrategy
) {
this.consumer = Objects.requireNonNull(consumer);
this.resetState = resetState;
Expand All @@ -114,6 +119,7 @@ public DefaultStreamPoller(
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
)
);
this.errorStrategy = errorStrategy;
}

@Override
Expand All @@ -138,6 +144,9 @@ protected void startPoll() {
}
logger.info("Starting poller for shard {}", consumer.getShardId());

// track the last record successfully written to the blocking queue
IngestionShardPointer lastSuccessfulPointer = null;

while (true) {
try {
if (closed) {
Expand Down Expand Up @@ -205,6 +214,7 @@ protected void startPoll() {
continue;
}
blockingQueue.put(result);
lastSuccessfulPointer = result.getPointer();
logger.debug(
"Put message {} with pointer {} to the blocking queue",
String.valueOf(result.getMessage().getPayload()),
Expand All @@ -214,8 +224,18 @@ protected void startPoll() {
// update the batch start pointer to the next batch
batchStartPointer = consumer.nextPointer();
} catch (Throwable e) {
// TODO better error handling
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);

if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
// Blocking error encountered. Pause poller to stop processing remaining updates.
pause();
} else {
// Advance the batch start pointer to ignore the error and continue from next record
batchStartPointer = lastSuccessfulPointer == null
? consumer.nextPointer(batchStartPointer)
: consumer.nextPointer(lastSuccessfulPointer);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This error handling strategy drops failures and proceeds with remaining updates in the ingestion source.
*/
public class DropIngestionErrorStrategy implements IngestionErrorStrategy {
private static final Logger logger = LogManager.getLogger(DropIngestionErrorStrategy.class);
private final String ingestionSource;

public DropIngestionErrorStrategy(String ingestionSource) {
this.ingestionSource = ingestionSource;
}

@Override
public void handleError(Throwable e, ErrorStage stage) {
logger.error("Error processing update from {}: {}", ingestionSource, e);

// todo: record failed update stats and emit metrics
}

@Override
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
return false;
}

}
Loading
Loading