Skip to content

Commit

Permalink
Add error handling strategy to pull-based ingestion
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bharadwaj <[email protected]>
  • Loading branch information
varunbharadwaj committed Feb 26, 2025
1 parent 0ffed5e commit 474008b
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public KafkaOffset nextPointer() {
return new KafkaOffset(lastFetchedOffset + 1);
}

@Override
public KafkaOffset nextPointer(KafkaOffset pointer) {
return new KafkaOffset(pointer.getOffset() + 1);
}

@Override
public IngestionShardPointer earliestPointer() {
long startOffset = AccessController.doPrivileged(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -770,6 +771,30 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error.strategy";
public static final Setting<String> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = Setting.simpleString(
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
new Setting.Validator<>() {

@Override
public void validate(final String value) {
try {
IngestionErrorStrategy.ErrorStrategy.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid value for " + SETTING_INGESTION_SOURCE_ERROR_STRATEGY + " [" + value + "]");
}
}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
validate(value);
}
},
Property.IndexScope,
Property.Dynamic
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -1004,8 +1029,13 @@ public IngestionSource getIngestionSource() {
pointerInitResetType,
pointerInitResetValue
);

final String errorStrategyString = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
IngestionErrorStrategy.ErrorStrategy errorStrategy = IngestionErrorStrategy.ErrorStrategy.valueOf(
errorStrategyString.toUpperCase(Locale.ROOT)
);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.cluster.metadata;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;

import java.util.Map;
Expand All @@ -21,12 +22,19 @@
public class IngestionSource {
private String type;
private PointerInitReset pointerInitReset;
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
private Map<String, Object> params;

public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
public IngestionSource(
String type,
PointerInitReset pointerInitReset,
IngestionErrorStrategy.ErrorStrategy errorStrategy,
Map<String, Object> params
) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
this.errorStrategy = errorStrategy;
}

public String getType() {
Expand All @@ -37,6 +45,10 @@ public PointerInitReset getPointerInitReset() {
return pointerInitReset;
}

public IngestionErrorStrategy.ErrorStrategy getErrorStrategy() {
return errorStrategy;
}

public Map<String, Object> params() {
return params;
}
Expand All @@ -48,17 +60,30 @@ public boolean equals(Object o) {
IngestionSource ingestionSource = (IngestionSource) o;
return Objects.equals(type, ingestionSource.type)
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
&& Objects.equals(params, ingestionSource.params);
}

@Override
public int hashCode() {
return Objects.hash(type, pointerInitReset, params);
return Objects.hash(type, pointerInitReset, params, errorStrategy);
}

@Override
public String toString() {
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
return "IngestionSource{"
+ "type='"
+ type
+ '\''
+ ",pointer_init_reset='"
+ pointerInitReset
+ '\''
+ ",error_strategy='"
+ errorStrategy
+ '\''
+ ", params="
+ params
+ '}';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public M getMessage() {
*/
T nextPointer();

/**
* @return the immediate next pointer from the provided start pointer
*/
T nextPointer(T startPointer);

/**
* @return the earliest pointer in the shard
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -189,7 +190,20 @@ public void start() {
}

String resetValue = ingestionSource.getPointerInitReset().getValue();
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue);
IngestionErrorStrategy ingestionErrorStrategy = IngestionErrorStrategy.create(
ingestionSource.getErrorStrategy(),
ingestionSource.getType()
);

streamPoller = new DefaultStreamPoller(
startPointer,
persistedPointers,
ingestionShardConsumer,
this,
resetState,
resetValue,
ingestionErrorStrategy
);
streamPoller.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This error handling strategy blocks on failures preventing processing of remaining updates in the ingestion source.
*/
public class BlockIngestionErrorStrategy implements IngestionErrorStrategy {
private static final Logger logger = LogManager.getLogger(BlockIngestionErrorStrategy.class);
private final String ingestionSource;

public BlockIngestionErrorStrategy(String ingestionSource) {
this.ingestionSource = ingestionSource;
}

@Override
public void handleError(Throwable e, ErrorStage stage) {
logger.error("Error processing update from {}: {}", ingestionSource, e);

// todo: record blocking update and emit metrics
}

@Override
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,25 @@ public class DefaultStreamPoller implements StreamPoller {
@Nullable
private IngestionShardPointer maxPersistedPointer;

private IngestionErrorStrategy errorStrategy;

public DefaultStreamPoller(
IngestionShardPointer startPointer,
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
IngestionEngine ingestionEngine,
ResetState resetState,
String resetValue
String resetValue,
IngestionErrorStrategy errorStrategy
) {
this(
startPointer,
persistedPointers,
consumer,
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
resetState,
resetValue
resetValue,
errorStrategy
);
}

Expand All @@ -88,7 +92,8 @@ public DefaultStreamPoller(
IngestionShardConsumer consumer,
MessageProcessorRunnable processorRunnable,
ResetState resetState,
String resetValue
String resetValue,
IngestionErrorStrategy errorStrategy
) {
this.consumer = Objects.requireNonNull(consumer);
this.resetState = resetState;
Expand All @@ -114,6 +119,7 @@ public DefaultStreamPoller(
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
)
);
this.errorStrategy = errorStrategy;
}

@Override
Expand All @@ -138,6 +144,9 @@ protected void startPoll() {
}
logger.info("Starting poller for shard {}", consumer.getShardId());

// track the last record successfully written to the blocking queue
IngestionShardPointer lastSuccessfulPointer = null;

while (true) {
try {
if (closed) {
Expand Down Expand Up @@ -205,6 +214,7 @@ protected void startPoll() {
continue;
}
blockingQueue.put(result);
lastSuccessfulPointer = result.getPointer();
logger.debug(
"Put message {} with pointer {} to the blocking queue",
String.valueOf(result.getMessage().getPayload()),
Expand All @@ -214,8 +224,18 @@ protected void startPoll() {
// update the batch start pointer to the next batch
batchStartPointer = consumer.nextPointer();
} catch (Throwable e) {
// TODO better error handling
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);

if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
// Blocking error encountered. Pause poller to stop processing remaining updates.
pause();
} else {
// Advance the batch start pointer to ignore the error and continue from next record
batchStartPointer = lastSuccessfulPointer == null
? consumer.nextPointer(batchStartPointer)
: consumer.nextPointer(lastSuccessfulPointer);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This error handling strategy drops failures and proceeds with remaining updates in the ingestion source.
*/
public class DropIngestionErrorStrategy implements IngestionErrorStrategy {
private static final Logger logger = LogManager.getLogger(DropIngestionErrorStrategy.class);
private final String ingestionSource;

public DropIngestionErrorStrategy(String ingestionSource) {
this.ingestionSource = ingestionSource;
}

@Override
public void handleError(Throwable e, ErrorStage stage) {
logger.error("Error processing update from {}: {}", ingestionSource, e);

// todo: record failed update stats and emit metrics
}

@Override
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
return false;
}

}
Loading

0 comments on commit 474008b

Please sign in to comment.