Skip to content

Commit

Permalink
[fast-client] Complete error retry for fast client multi-get (#892)
Browse files Browse the repository at this point in the history
* [fast-client] Complete error retry for fast client multi-get

1. Error retry was already covered by long tail retry. This means we have error retry at
the request level and not at the per route level. This means keys that are not done yet
by the long tail retry threshold time will be retried regardless if it's due to latency
or error. We have unit tests for various scenarios to complete the table:
original request | retry request
fail             | fail
fail             | success
late success     | fail
late success     | success

2. Added the ability to differentiate 429 from other errors so retry is skipped altogether
if we encountered 429 (assuming subsequent retry will also be rejected).
  • Loading branch information
xunyin8 authored Mar 13, 2024
1 parent dfbc9d4 commit da8f900
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,9 @@ public int getPartitionId() {
}

static class RetryContext<K, V> {
MultiKeyRequestContext<K, V> originalRequestContext;
MultiKeyRequestContext<K, V> retryRequestContext;

RetryContext() {
originalRequestContext = null;
retryRequestContext = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
import com.linkedin.venice.client.exceptions.VeniceClientRateExceededException;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compute.ComputeRequestWrapper;
Expand Down Expand Up @@ -34,10 +36,13 @@ public class RetriableAvroGenericStoreClient<K, V> extends DelegatingAvroStoreCl
private final int longTailRetryThresholdForSingleGetInMicroSeconds;
private final int longTailRetryThresholdForBatchGetInMicroSeconds;
private final int longTailRetryThresholdForComputeInMicroSeconds;
private TimeoutProcessor timeoutProcessor;
private final TimeoutProcessor timeoutProcessor;
private static final Logger LOGGER = LogManager.getLogger(RetriableAvroGenericStoreClient.class);

public RetriableAvroGenericStoreClient(InternalAvroStoreClient<K, V> delegate, ClientConfig clientConfig) {
public RetriableAvroGenericStoreClient(
InternalAvroStoreClient<K, V> delegate,
ClientConfig clientConfig,
TimeoutProcessor timeoutProcessor) {
super(delegate, clientConfig);
if (!(clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
|| clientConfig.isLongTailRetryEnabledForCompute())) {
Expand All @@ -52,6 +57,7 @@ public RetriableAvroGenericStoreClient(InternalAvroStoreClient<K, V> delegate, C
clientConfig.getLongTailRetryThresholdForBatchGetInMicroSeconds();
this.longTailRetryThresholdForComputeInMicroSeconds =
clientConfig.getLongTailRetryThresholdForComputeInMicroSeconds();
this.timeoutProcessor = timeoutProcessor;
}

enum RetryType {
Expand Down Expand Up @@ -101,19 +107,17 @@ protected CompletableFuture<V> get(GetRequestContext requestContext, K key) thro
// if longTailRetry is not enabled for single get, simply return the original future
return originalRequestFuture;
}

if (timeoutProcessor == null) {
/**
* Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} to
* reduce the thread usage.
*/
timeoutProcessor = requestContext.instanceHealthMonitor.getTimeoutProcessor();
}
final CompletableFuture<V> retryFuture = new CompletableFuture<>();
final CompletableFuture<V> finalFuture = new CompletableFuture<>();

AtomicReference<Throwable> savedException = new AtomicReference<>();
// create a retry task
Runnable retryTask = () -> {
if (savedException.get() != null && isExceptionCausedByTooManyRequests(savedException.get())) {
// Defensive code, abort retry if original request failed due to 429
retryFuture.completeExceptionally(savedException.get());
return;
}
super.get(requestContext, key).whenComplete((value, throwable) -> {
if (throwable != null) {
retryFuture.completeExceptionally(throwable);
Expand Down Expand Up @@ -146,10 +150,14 @@ protected CompletableFuture<V> get(GetRequestContext requestContext, K key) thro
requestContext.retryContext.retryWin = false;
}
} else {
// Trigger the retry right away when receiving any error
// Trigger the retry right away when receiving any error that's not a 429 otherwise try to cancel any scheduled
// retry
savedException.set(throwable);
if (!timeoutFuture.isDone()) {
timeoutFuture.cancel();
new RetryRunnable(requestContext, RetryType.ERROR_RETRY, retryTask).run();
if (!isExceptionCausedByTooManyRequests(throwable)) {
new RetryRunnable(requestContext, RetryType.ERROR_RETRY, retryTask).run();
}
}
}
});
Expand Down Expand Up @@ -232,7 +240,6 @@ private <R extends MultiKeyRequestContext<K, V>, RESPONSE> void retryStreamingMu
R originalRequestContext = requestContextConstructor.construct(keys.size(), requestContext.isPartialSuccessAllowed);

requestContext.retryContext = new MultiKeyRequestContext.RetryContext<K, V>();
requestContext.retryContext.originalRequestContext = originalRequestContext;

/** Track the final completion of the request. It will be completed normally if
1. the original requests calls onCompletion with no exception
Expand All @@ -253,19 +260,15 @@ private <R extends MultiKeyRequestContext<K, V>, RESPONSE> void retryStreamingMu
pendingKeysFuture.put(key, originalCompletion);
}

streamingRequestExecutor.trigger(
originalRequestContext,
keys,
getStreamingCallback(originalRequestContext, finalRequestCompletionFuture, savedException, pendingKeysFuture));

if (timeoutProcessor == null) {
/** Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} of the original request to
reduce thread usage */
timeoutProcessor = requestContext.retryContext.originalRequestContext.instanceHealthMonitor.getTimeoutProcessor();
}

Runnable retryTask = () -> { // Look at the remaining keys and setup completion
if (!pendingKeysFuture.isEmpty()) {
Throwable throwable = savedException.get();
if (isExceptionCausedByTooManyRequests(throwable)) {
// Defensive code, do not trigger retry and complete the final request completion future if we encountered
// 429.
finalRequestCompletionFuture.completeExceptionally(throwable);
return;
}
Set<K> pendingKeys = Collections.unmodifiableSet(pendingKeysFuture.keySet());
R retryRequestContext =
requestContextConstructor.construct(pendingKeys.size(), requestContext.isPartialSuccessAllowed);
Expand All @@ -278,7 +281,12 @@ private <R extends MultiKeyRequestContext<K, V>, RESPONSE> void retryStreamingMu
streamingRequestExecutor.trigger(
retryRequestContext,
pendingKeys,
getStreamingCallback(retryRequestContext, finalRequestCompletionFuture, savedException, pendingKeysFuture));
getStreamingCallback(
retryRequestContext,
finalRequestCompletionFuture,
savedException,
pendingKeysFuture,
null));
} else {
/** If there are no keys pending at this point , the onCompletion callback of the original
request will be triggered. So no need to do anything.*/
Expand All @@ -289,6 +297,25 @@ private <R extends MultiKeyRequestContext<K, V>, RESPONSE> void retryStreamingMu
TimeoutProcessor.TimeoutFuture scheduledRetryTask =
timeoutProcessor.schedule(retryTask, longTailRetryThresholdInMicroSeconds, TimeUnit.MICROSECONDS);

/**
* Retry for streaming multi-key request is done at the request level. This mean we will perform one retry for the
* entire request for both errors and long tail to avoid retry storm. There are two behaviors with retry:
* 1. If any of the route returned a too many requests 429 exception we will try our best to cancel all scheduled
* retry and complete the final future. This means some routes could still be in progress, so we will assume those
* will also soon fail with a 429.
* 2. If no 429 exceptions are caught after longTailRetryThresholdInMicroSeconds when the retry task is running then
* all incomplete keys whether due to long tail or errors (e.g. mis-routed) are retried.
*/
streamingRequestExecutor.trigger(
originalRequestContext,
keys,
getStreamingCallback(
originalRequestContext,
finalRequestCompletionFuture,
savedException,
pendingKeysFuture,
scheduledRetryTask));

finalRequestCompletionFuture.whenComplete((ignore, finalException) -> {
if (!scheduledRetryTask.isDone()) {
scheduledRetryTask.cancel();
Expand All @@ -312,7 +339,8 @@ private <RESPONSE> StreamingCallback<K, RESPONSE> getStreamingCallback(
MultiKeyRequestContext<K, V> requestContext,
CompletableFuture<Void> finalRequestCompletionFuture,
AtomicReference<Throwable> savedException,
VeniceConcurrentHashMap<K, CompletableFuture<RESPONSE>> pendingKeysFuture) {
VeniceConcurrentHashMap<K, CompletableFuture<RESPONSE>> pendingKeysFuture,
TimeoutProcessor.TimeoutFuture scheduledRetryTask) {
return new StreamingCallback<K, RESPONSE>() {
@Override
public void onRecordReceived(K key, RESPONSE value) {
Expand Down Expand Up @@ -341,12 +369,21 @@ public void onCompletion(Optional<Exception> exception) {
if (!exceptionToSave.isPresent()) {
finalRequestCompletionFuture.complete(null);
} else {
// If we are able to set an exception, that means the other request did not have exception, so we continue.
if (!savedException.compareAndSet(null, exceptionToSave.get())) {
/* We are not able to set the exception , means there is already a saved exception.
/* If we are able to set an exception, that means the other request did not have exception, so we continue.
If we are not able to set the exception , means there is already a saved exception.
Since there was a saved exception and this request has also returned exception we can conclude that
the parent request can be marked with exception. We select the original exception. This future is
internal to this class and is allowed to complete exceptionally even for streaming APIs. */
boolean shouldCompleteRequestFuture = !savedException.compareAndSet(null, exceptionToSave.get());
if (scheduledRetryTask != null && isExceptionCausedByTooManyRequests(exceptionToSave.get())) {
// Check if the exception is 429, if so cancel the retry if the retry task exists
if (!scheduledRetryTask.isDone()) {
scheduledRetryTask.cancel();
shouldCompleteRequestFuture = true;
}
}

if (shouldCompleteRequestFuture) {
finalRequestCompletionFuture.completeExceptionally(exceptionToSave.get());
}
}
Expand All @@ -355,6 +392,14 @@ internal to this class and is allowed to complete exceptionally even for streami
};
}

private boolean isExceptionCausedByTooManyRequests(Throwable e) {
if (e instanceof VeniceClientHttpException) {
VeniceClientHttpException clientHttpException = (VeniceClientHttpException) e;
return clientHttpException.getHttpStatus() == VeniceClientRateExceededException.HTTP_TOO_MANY_REQUESTS;
}
return false;
}

interface RequestContextConstructor<K, V, R extends MultiKeyRequestContext<K, V>> {
R construct(int numKeysInRequest, boolean isPartialSuccessAllowed);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.linkedin.venice.fastclient;

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import org.apache.avro.specific.SpecificRecord;


public class RetriableAvroSpecificStoreClient<K, V extends SpecificRecord> extends RetriableAvroGenericStoreClient<K, V>
implements AvroSpecificStoreClient<K, V> {
public RetriableAvroSpecificStoreClient(InternalAvroStoreClient<K, V> delegate, ClientConfig clientConfig) {
super(delegate, clientConfig);
public RetriableAvroSpecificStoreClient(
InternalAvroStoreClient<K, V> delegate,
ClientConfig clientConfig,
TimeoutProcessor timeoutProcessor) {
super(delegate, clientConfig, timeoutProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ public static <K, V> AvroGenericStoreClient<K, V> getAndStartGenericStoreClient(
if (clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
|| clientConfig.isLongTailRetryEnabledForCompute()) {
statsStoreClient = new StatsAvroGenericStoreClient<>(
new RetriableAvroGenericStoreClient<>(dispatchingStoreClient, clientConfig),
/**
* Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} to
* reduce the thread usage.
*/
new RetriableAvroGenericStoreClient<>(
dispatchingStoreClient,
clientConfig,
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor()),
clientConfig);
} else {
statsStoreClient = new StatsAvroGenericStoreClient<>(dispatchingStoreClient, clientConfig);
Expand All @@ -104,7 +111,10 @@ public static <K, V extends SpecificRecord> AvroSpecificStoreClient<K, V> getAnd
if (clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
|| clientConfig.isLongTailRetryEnabledForCompute()) {
statsStoreClient = new StatsAvroSpecificStoreClient<>(
new RetriableAvroSpecificStoreClient<>(dispatchingStoreClient, clientConfig),
new RetriableAvroSpecificStoreClient<>(
dispatchingStoreClient,
clientConfig,
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor()),
clientConfig);
} else {
statsStoreClient = new StatsAvroSpecificStoreClient<>(dispatchingStoreClient, clientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,66 @@ public void testStreamingBatchGetLongTailBothError()
0); // no retries are successful. Not counted regardless
}

/**
* same as {@link #testStreamingBatchGetLongTailRetryOriginalRequestErrorBeforeRetry} but with multiple routes
* throwing errors.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testStreamingBatchGetLongTailRetryWithMultipleErrors()
throws InterruptedException, ExecutionException, TimeoutException {

int expectedRetryKeyCount = (NUM_KEYS / NUM_PARTITIONS) * 2;
TestClientSimulator client = new TestClientSimulator();
setupLongTailRetryWithMultiplePartitions(client)
.expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0)
.expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1)
.expectRequestWithKeysForPartitionOnRoute(1, 3, "https://host2.linkedin.com", 2)
.respondToRequestWithKeyValues(5, 1)
.respondToRequestWithError(6, 2, 500)
.respondToRequestWithError(7, 3, 500)
.expectRequestWithKeysForPartitionOnRoute(50, 4, "https://host1.linkedin.com", 2)
.expectRequestWithKeysForPartitionOnRoute(51, 5, "https://host0.linkedin.com", 1)
.respondToRequestWithKeyValues(55, 4)
.respondToRequestWithKeyValues(56, 5)
.simulate();

callStreamingBatchGetAndVerifyResults(
client.getFastClient(),
client.getRequestedKeyValues(),
client.getSimulatorCompleteFuture());

validateMetrics(client, NUM_KEYS, NUM_KEYS, expectedRetryKeyCount, expectedRetryKeyCount);
}

/**
* same as {@link #testStreamingBatchGetLongTailRetryOriginalRequestErrorBeforeRetry} but request id 3 throws 429 too
* many request error. Long tail retry should be skipped and propagate the quota exceeded exception to the caller.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testStreamingBatchGetNoLongTailRetryWithTooManyRequest()
throws InterruptedException, ExecutionException, TimeoutException {

TestClientSimulator client = new TestClientSimulator();
setupLongTailRetryWithMultiplePartitions(client)
.expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host0.linkedin.com", 0)
.expectRequestWithKeysForPartitionOnRoute(1, 2, "https://host1.linkedin.com", 1)
.expectRequestWithKeysForPartitionOnRoute(1, 3, "https://host2.linkedin.com", 2)
.respondToRequestWithKeyValues(5, 1)
.respondToRequestWithKeyValues(6, 2)
.respondToRequestWithError(7, 3, 429)
.simulate();

callStreamingBatchGetAndVerifyResults(
client.getFastClient(),
client.getRequestedKeyValues(),
client.getSimulatorCompleteFuture(),
true);

// Technically its (NUM_KEYS/3) * 2 for successful keys, but we aren't incrementing successful metrics in case
// failed request
validateMetrics(client, NUM_KEYS, 0, 0, 0);
}

private TestClientSimulator setupLongTailRetryWithMultiplePartitions(TestClientSimulator client) {
return client.generateKeyValues(0, NUM_KEYS) // generate NUM_KEYS keys
.partitionKeys(NUM_PARTITIONS) // partition into NUM_PARTITIONS partitions
Expand Down Expand Up @@ -662,7 +722,7 @@ public void onRecordReceived(String key, Utf8 value) {

@Override
public void onCompletion(Optional<Exception> exception) {
LOGGER.info("OnCompletion called . Exception: {} isComplete: {} ", exception, isComplete.get());
LOGGER.info("OnCompletion called. Exception: {} isComplete: {} ", exception, isComplete.get());
if (!exception.isPresent()) {
assertEquals(exception, Optional.empty());
assertTrue(isComplete.compareAndSet(false, true));
Expand Down Expand Up @@ -696,6 +756,10 @@ public void onCompletion(Optional<Exception> exception) {
try {
allCompletionFuture.get(CLIENT_TIME_OUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (Exception exception) {
LOGGER.info(
"Record completion future is done: {}, simulator completion future is done: {}",
recordCompletionFuture.isDone(),
simulatorCompletionFuture.isDone());
if (expectedError) {
LOGGER.info("Test completed successfully because was expecting an exception");
} else
Expand Down
Loading

0 comments on commit da8f900

Please sign in to comment.