Skip to content

Commit

Permalink
[router][common][fast-client] Add RetryManager to venice-router (#1058)
Browse files Browse the repository at this point in the history
* [router][common][fast-client] Add RetryManager to venice-router for long tail retry

1. Move RetryManager to venice-common so it can be used by both fast-client and venice-router. Refactored the constructor a bit so in Venice router we can initialize instances of RetryManager per store (max 2, single-get and multi-get) without initializing multiple schedulers.

2. RetryManager for fast-client has store level resource isolation because a fast-client instance is mapped to a single Venice store. RetryManager for venice-router in this change is also made to have store level resource isolation.

3. Updated the fast-client RetryManagerStats with store name in metric prefix to avoid conflict/confusion since a user host could have multiple instances of fast-client for multiple stores.
  • Loading branch information
xunyin8 authored Jul 26, 2024
1 parent df1e78b commit 4a20cdf
Show file tree
Hide file tree
Showing 22 changed files with 567 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.fastclient.meta.RetryManager;
import com.linkedin.venice.meta.RetryManager;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.Schema;
Expand All @@ -31,25 +34,28 @@
* 2. Leverage some smart logic to avoid useless retry, such as retry triggered by heavy GC.
*/
public class RetriableAvroGenericStoreClient<K, V> extends DelegatingAvroStoreClient<K, V> {
private static final String FAST_CLIENT_RETRY_MANAGER_THREAD_PREFIX = "Fast-client-retry-manager-thread";
private final boolean longTailRetryEnabledForSingleGet;
private final boolean longTailRetryEnabledForBatchGet;
private final boolean longTailRetryEnabledForCompute;
private final int longTailRetryThresholdForSingleGetInMicroSeconds;
private final int longTailRetryThresholdForBatchGetInMicroSeconds;
private final int longTailRetryThresholdForComputeInMicroSeconds;
private final TimeoutProcessor timeoutProcessor;
private final ScheduledExecutorService retryManagerExecutorService =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory(FAST_CLIENT_RETRY_MANAGER_THREAD_PREFIX));
/**
* The long tail retry budget is only applied to long tail retries. If there were any exception that's not a 429 the
* retry will be triggered without going through the long tail {@link RetryManager}. If the retry budget is exhausted
* retry will be triggered without going through the long tail {@link com.linkedin.venice.meta.RetryManager}. If the retry budget is exhausted
* then the retry task will do nothing and the request will either complete eventually (original future) or time out.
*/
private RetryManager singleGetLongTailRetryManager = null;
private RetryManager multiGetLongTailRetryManager = null;
private RetryManager singleKeyLongTailRetryManager = null;
private RetryManager multiKeyLongTailRetryManager = null;
private static final Logger LOGGER = LogManager.getLogger(RetriableAvroGenericStoreClient.class);
// Default value of 0.1 meaning only 10 percent of the user requests are allowed to trigger long tail retry
private static final double LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL = 0.1d;
private static final String SINGLE_GET_LONG_TAIL_RETRY_STATS_PREFIX = "single-get-long-tail-retry-manager";
private static final String MULTI_GET_LONG_TAIL_RETRY_STATS_PREFIX = "multi-get-long-tail-retry-manager";
private static final String SINGLE_KEY_LONG_TAIL_RETRY_STATS_PREFIX = "single-key-long-tail-retry-manager-";
private static final String MULTI_KEY_LONG_TAIL_RETRY_STATS_PREFIX = "multi-key-long-tail-retry-manager-";

public RetriableAvroGenericStoreClient(
InternalAvroStoreClient<K, V> delegate,
Expand All @@ -71,18 +77,20 @@ public RetriableAvroGenericStoreClient(
clientConfig.getLongTailRetryThresholdForComputeInMicroSeconds();
this.timeoutProcessor = timeoutProcessor;
if (longTailRetryEnabledForSingleGet) {
this.singleGetLongTailRetryManager = new RetryManager(
this.singleKeyLongTailRetryManager = new RetryManager(
clientConfig.getClusterStats().getMetricsRepository(),
SINGLE_GET_LONG_TAIL_RETRY_STATS_PREFIX,
SINGLE_KEY_LONG_TAIL_RETRY_STATS_PREFIX + clientConfig.getStoreName(),
clientConfig.getLongTailRetryBudgetEnforcementWindowInMs(),
LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL);
LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL,
retryManagerExecutorService);
}
if (longTailRetryEnabledForBatchGet) {
this.multiGetLongTailRetryManager = new RetryManager(
this.multiKeyLongTailRetryManager = new RetryManager(
clientConfig.getClusterStats().getMetricsRepository(),
MULTI_GET_LONG_TAIL_RETRY_STATS_PREFIX,
MULTI_KEY_LONG_TAIL_RETRY_STATS_PREFIX + clientConfig.getStoreName(),
clientConfig.getLongTailRetryBudgetEnforcementWindowInMs(),
LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL);
LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL,
retryManagerExecutorService);
}
}

Expand Down Expand Up @@ -133,7 +141,7 @@ protected CompletableFuture<V> get(GetRequestContext requestContext, K key) thro
// if longTailRetry is not enabled for single get, simply return the original future
return originalRequestFuture;
}
singleGetLongTailRetryManager.recordRequest();
singleKeyLongTailRetryManager.recordRequest();
final CompletableFuture<V> retryFuture = new CompletableFuture<>();
final CompletableFuture<V> finalFuture = new CompletableFuture<>();

Expand All @@ -145,7 +153,7 @@ protected CompletableFuture<V> get(GetRequestContext requestContext, K key) thro
retryFuture.completeExceptionally(savedException.get());
return;
}
if (savedException.get() != null || singleGetLongTailRetryManager.isRetryAllowed()) {
if (savedException.get() != null || singleKeyLongTailRetryManager.isRetryAllowed()) {
super.get(requestContext, key).whenComplete((value, throwable) -> {
if (throwable != null) {
retryFuture.completeExceptionally(throwable);
Expand Down Expand Up @@ -261,12 +269,7 @@ public void compute(

@Override
public void close() {
if (singleGetLongTailRetryManager != null) {
singleGetLongTailRetryManager.close();
}
if (multiGetLongTailRetryManager != null) {
multiGetLongTailRetryManager.close();
}
retryManagerExecutorService.shutdownNow();
super.close();
}

Expand Down Expand Up @@ -309,7 +312,7 @@ private <R extends MultiKeyRequestContext<K, V>, RESPONSE> void retryStreamingMu
finalRequestCompletionFuture.completeExceptionally(throwable);
return;
}
if (throwable != null || multiGetLongTailRetryManager.isRetryAllowed()) {
if (throwable != null || multiKeyLongTailRetryManager.isRetryAllowed()) {
Set<K> pendingKeys = Collections.unmodifiableSet(pendingKeysFuture.keySet());
R retryRequestContext =
requestContextConstructor.construct(pendingKeys.size(), requestContext.isPartialSuccessAllowed);
Expand Down Expand Up @@ -357,7 +360,7 @@ private <R extends MultiKeyRequestContext<K, V>, RESPONSE> void retryStreamingMu
savedException,
pendingKeysFuture,
scheduledRetryTask));
multiGetLongTailRetryManager.recordRequest();
multiKeyLongTailRetryManager.recordRequest();

finalRequestCompletionFuture.whenComplete((ignore, finalException) -> {
if (!scheduledRetryTask.isDone()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2085,4 +2085,31 @@ private ConfigKeys() {
* Only used in batch push jobs and partial updates.
*/
public static final String CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES = "controller.default.max.record.size.bytes";

/**g
* Percentage of total single get requests that are allowed for retry in decimal. e.g. 0.1 would mean up to 10% of the
* total single get requests are allowed for long tail retry. This is to prevent retry storm and cascading failures.
*/
public static final String ROUTER_SINGLE_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL =
"router.single.key.long.tail.retry.budget.percent.decimal";

/**
* Percentage of total multi get requests that are allowed for retry in decimal. e.g. 0.1 would mean up to 10% of the
* total multi get requests are allowed for long tail retry. This is to prevent retry storm and cascading failures.
*/
public static final String ROUTER_MULTI_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL =
"router.multi.key.long.tail.retry.budget.percent.decimal";

/**
* Enforcement window for router long tail retry budget token bucket. This applies to both single get and multi get
* retry managers.
*/
public static final String ROUTER_LONG_TAIL_RETRY_BUDGET_ENFORCEMENT_WINDOW_MS =
"router.long.tail.retry.budget.enforcement.window.ms";

/**
* The core pool size for the thread pool executor which contains threads responsible for measuring and updating all
* retry managers in router periodically to provide retry budget based on a percentage of the original requests.
*/
public static final String ROUTER_RETRY_MANAGER_CORE_POOL_SIZE = "router.retry.manager.core.pool.size";
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.linkedin.venice.fastclient.meta;
package com.linkedin.venice.meta;

import com.linkedin.venice.fastclient.stats.RetryManagerStats;
import com.linkedin.venice.stats.RetryManagerStats;
import com.linkedin.venice.throttle.TokenBucket;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.time.Clock;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -15,11 +13,7 @@
import org.apache.logging.log4j.Logger;


/**
* This class offers advanced client retry behaviors. Specifically enforcing a retry budget and relevant monitoring to
* avoid retry storm and alert users when the retry threshold is misconfigured or service is degrading.
*/
public class RetryManager implements Closeable {
public class RetryManager {
private static final Logger LOGGER = LogManager.getLogger(RetryManager.class);
private static final int TOKEN_BUCKET_REFILL_INTERVAL_IN_SECONDS = 1;
/**
Expand All @@ -43,16 +37,16 @@ public RetryManager(
String metricNamePrefix,
long enforcementWindowInMs,
double retryBudgetInPercentDecimal,
Clock clock) {
Clock clock,
ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
if (enforcementWindowInMs <= 0 || retryBudgetInPercentDecimal <= 0) {
scheduler = null;
retryBudgetEnabled.set(false);
} else {
scheduler = Executors.newScheduledThreadPool(1);
retryBudgetEnabled.set(true);
lastUpdateTimestamp = clock.millis();
retryManagerStats = new RetryManagerStats(metricsRepository, metricNamePrefix, this);
scheduler.schedule(this::updateRetryTokenBucket, enforcementWindowInMs, TimeUnit.MILLISECONDS);
this.scheduler.schedule(this::updateRetryTokenBucket, enforcementWindowInMs, TimeUnit.MILLISECONDS);
}
this.enforcementWindowInMs = enforcementWindowInMs;
this.retryBudgetInPercentDecimal = retryBudgetInPercentDecimal;
Expand All @@ -63,8 +57,15 @@ public RetryManager(
MetricsRepository metricsRepository,
String metricNamePrefix,
long enforcementWindowInMs,
double retryBudgetInPercentDecimal) {
this(metricsRepository, metricNamePrefix, enforcementWindowInMs, retryBudgetInPercentDecimal, Clock.systemUTC());
double retryBudgetInPercentDecimal,
ScheduledExecutorService scheduler) {
this(
metricsRepository,
metricNamePrefix,
enforcementWindowInMs,
retryBudgetInPercentDecimal,
Clock.systemUTC(),
scheduler);
}

public void recordRequest() {
Expand All @@ -74,15 +75,19 @@ public void recordRequest() {
}

public boolean isRetryAllowed() {
return this.isRetryAllowed(1);
}

public boolean isRetryAllowed(int numberOfRetries) {
TokenBucket tokenBucket = retryTokenBucket.get();
if (!retryBudgetEnabled.get() || tokenBucket == null) {
// All retries are allowed when the feature is disabled or during the very first enforcement window when we
// haven't collected enough data points yet
return true;
}
boolean retryAllowed = retryTokenBucket.get().tryConsume(1);
boolean retryAllowed = retryTokenBucket.get().tryConsume(numberOfRetries);
if (!retryAllowed) {
retryManagerStats.recordRejectedRetry();
retryManagerStats.recordRejectedRetry(numberOfRetries);
}
return retryAllowed;
}
Expand Down Expand Up @@ -132,11 +137,4 @@ private void updateTokenBucket(long newQPS) {
public TokenBucket getRetryTokenBucket() {
return retryTokenBucket.get();
}

@Override
public void close() {
if (scheduler != null) {
scheduler.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.venice.fastclient.stats;
package com.linkedin.venice.stats;

import com.linkedin.venice.fastclient.meta.RetryManager;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.meta.RetryManager;
import com.linkedin.venice.throttle.TokenBucket;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
Expand Down Expand Up @@ -35,7 +34,7 @@ public RetryManagerStats(MetricsRepository metricsRepository, String name, Retry
rejectedRetrySensor = registerSensorIfAbsent("rejected_retry", new OccurrenceRate());
}

public void recordRejectedRetry() {
rejectedRetrySensor.record();
public void recordRejectedRetry(int count) {
rejectedRetrySensor.record(count);
}
}
Loading

0 comments on commit 4a20cdf

Please sign in to comment.