Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ protected CompletableFuture<V> get(GetRequestContext<K> requestContext, K key) t
requestContext.routeRequestMap.put(requestContext.route, routeRequestFuture);
}
routeRequestFuture.completeExceptionally(e);
valueFuture.completeExceptionally(e);
}

return valueFuture;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.fastclient;

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.venice.annotation.VisibleForTesting;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientRateExceededException;
import com.linkedin.venice.client.store.ComputeGenericRecord;
Expand Down Expand Up @@ -111,6 +112,11 @@ public RetriableAvroGenericStoreClient(
BatchGetConfigUtils.parseRetryThresholdForBatchGet(longTailComputeRangeBasedRetryThresholdInMilliSeconds);
}

@VisibleForTesting
void setSingleKeyLongTailRetryManager(RetryManager retryManager) {
this.singleKeyLongTailRetryManager = retryManager;
}

enum RetryType {
LONG_TAIL_RETRY, ERROR_RETRY
}
Expand Down Expand Up @@ -186,6 +192,16 @@ protected CompletableFuture<V> get(GetRequestContext<K> requestContext, K key) t
}
}
});
} else {
// Budget exhausted: chain retryFuture to originalRequestFuture so the original error
// propagates instead of being masked by a synthetic "budget exhausted" exception.
originalRequestFuture.whenComplete((origValue, origThrowable) -> {
if (origThrowable != null) {
retryFuture.completeExceptionally(origThrowable);
} else {
retryFuture.complete(origValue);
}
});
}
};

Expand Down Expand Up @@ -213,6 +229,9 @@ protected CompletableFuture<V> get(GetRequestContext<K> requestContext, K key) t
timeoutFuture.cancel();
if (!isExceptionCausedByTooManyRequests(throwable)) {
new RetryRunnable(requestContext, RetryType.ERROR_RETRY, retryTask).run();
} else {
// 429 received before long-tail fired: retryTask won't run, complete retryFuture directly
retryFuture.completeExceptionally(throwable);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -1749,4 +1750,56 @@ public void testStreamingComputeToUnreachableClientV4() throws IOException, Exec
tearDown();
}
}

/**
* BUG REPRODUCTION: When an exception is thrown in
* {@link DispatchingAvroGenericStoreClient#get(GetRequestContext, Object)} before the
* {@code transportFuture.whenCompleteAsync(...)} callback is registered (or if
* {@code transportClient.get(...)} itself throws synchronously), the catch block completes
* {@code routeRequestFuture} but does NOT complete {@code valueFuture}. The returned
* {@code valueFuture} remains incomplete forever.
*
* This can happen when:
* - {@code metadata.trackHealthBasedOnRequestToInstance(...)} throws
* - {@code composeURIForSingleGet(...)} throws
* - {@code transportClient.get(...)} throws synchronously
*/
@Test(timeOut = TEST_TIMEOUT)
public void testGetValueFutureNeverCompletedWhenTransportClientThrowsSynchronously()
throws IOException, InterruptedException, ExecutionException {
try {
setUpClient();

// Simulate a synchronous failure from transportClient.get()
TransportClient throwingTransportClient = mock(TransportClient.class);
doThrow(new RuntimeException("Connection pool exhausted")).when(throwingTransportClient).get(any());

DispatchingAvroGenericStoreClient bugClient =
new DispatchingAvroGenericStoreClient(storeMetadata, clientConfig, throwingTransportClient);
bugClient.start();
bugClient.verifyMetadataInitialized();

GetRequestContext getRequestContext = new GetRequestContext();
CompletableFuture<GenericRecord> result = bugClient.get(getRequestContext, "test_key");

try {
// If the bug exists, this will timeout because valueFuture was never completed
// The catch block only completes routeRequestFuture, not valueFuture
result.get(2, TimeUnit.SECONDS);
fail("Expected an exception to be thrown");
} catch (TimeoutException e) {
// BUG CONFIRMED: valueFuture was never completed
fail(
"BUG: valueFuture hangs forever when transportClient.get() throws synchronously. "
+ "The catch block in DispatchingAvroGenericStoreClient#get(...) that handles synchronous "
+ "exceptions completes routeRequestFuture.completeExceptionally(e) "
+ "but does NOT complete valueFuture. The returned future remains incomplete forever.");
} catch (ExecutionException e) {
// CORRECT: valueFuture was completed with the exception
assertTrue(e.getCause() instanceof RuntimeException || e.getCause() instanceof VeniceClientException);
}
} finally {
tearDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.venice.schema.Utils.loadSchemaFileAsString;
import static com.linkedin.venice.stats.ClientType.FAST_CLIENT;
import static com.linkedin.venice.stats.VeniceMetricsRepository.getVeniceMetricsRepository;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
Expand All @@ -20,6 +21,7 @@
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientRateExceededException;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
Expand All @@ -28,6 +30,7 @@
import com.linkedin.venice.fastclient.meta.StoreMetadata;
import com.linkedin.venice.fastclient.stats.FastClientStats;
import com.linkedin.venice.fastclient.utils.ClientTestUtils;
import com.linkedin.venice.meta.RetryManager;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.TestUtils;
Expand All @@ -45,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -711,7 +715,7 @@
if (requestType.equals(RequestType.SINGLE_GET)) {
testSingleGetAndValidateMetrics(false, false, true, true, keyNotFound, noReplicaFound);
} else if (requestType.equals(RequestType.MULTI_GET_STREAMING)) {
testStreamingBatchGetAndValidateMetrics(false, true, true, keyNotFound, noReplicaFound);

Check failure on line 718 in clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

RetriableAvroGenericStoreClientTest.testGetWithTriggeringLongTailRetryAndRetryWins[14](MULTI_GET_STREAMING, true, false)

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 718 in clients/venice-client/src/test/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClientTest.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

RetriableAvroGenericStoreClientTest.testGetWithTriggeringLongTailRetryAndRetryWins[14](MULTI_GET_STREAMING, true, false)

java.lang.AssertionError: expected [true] but found [false]
} else if (requestType.equals(RequestType.MULTI_GET)) {
testBatchGetAndValidateMetrics(false, true, true, keyNotFound, noReplicaFound);
} else if (requestType.equals(RequestType.COMPUTE_STREAMING)) {
Expand Down Expand Up @@ -848,4 +852,207 @@
testComputeAndValidateMetrics(true, true, false, false, false);
}
}

/**
* BUG REPRODUCTION: When the long-tail retry fires but the retry budget is exhausted
* (isRetryAllowed() returns false), the retryTask does nothing and retryFuture is never completed.
* Later, when the original request fails, the error retry path is skipped because
* timeoutFuture.isDone() is true (long-tail already consumed it). This leaves finalFuture
* incomplete forever.
*
* This is the exact scenario observed in the mirror heap dump: instance returns 500 on heartbeat,
* many requests time out exhausting the retry budget, subsequent requests hang forever.
*
* We use reflection to replace the internal RetryManager with a mock that always denies retries,
* ensuring deterministic reproduction regardless of timing.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testFinalFutureHangsWhenRetryBudgetExhaustedAndOriginalFails() throws Exception {
clientConfigBuilder.setMetricsRepository(getVeniceMetricsRepository(FAST_CLIENT, CLIENT_METRIC_ENTITIES, true));
clientConfig = clientConfigBuilder.build();

StoreMetadata mockMetadata = mock(StoreMetadata.class);
doReturn(STORE_NAME).when(mockMetadata).getStoreName();
doReturn(1).when(mockMetadata).getLatestValueSchemaId();
doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1);

// Original request is slow: completes AFTER long-tail retry threshold with an error.
// This ensures the long-tail retry fires first, then original fails.
InternalAvroStoreClient dispatchingClient =
new DispatchingAvroGenericStoreClient<Object, Object>(mockMetadata, clientConfig) {
@Override
protected CompletableFuture get(GetRequestContext requestContext, Object key) throws VeniceClientException {
InstanceHealthMonitor instanceHealthMonitor = mock(InstanceHealthMonitor.class);
doReturn(timeoutProcessor).when(instanceHealthMonitor).getTimeoutProcessor();
requestContext.instanceHealthMonitor = instanceHealthMonitor;

final CompletableFuture originalRequestFuture = new CompletableFuture();
scheduledExecutor.schedule(
() -> originalRequestFuture
.completeExceptionally(new VeniceClientException("Instance unhealthy, 500 error")),
LONG_TAIL_RETRY_THRESHOLD_IN_MS * 3, // 300ms >> 100ms threshold
TimeUnit.MILLISECONDS);
return originalRequestFuture;
}
};

retriableClient = new RetriableAvroGenericStoreClient<>(dispatchingClient, clientConfig, timeoutProcessor);

// Replace the internal singleKeyLongTailRetryManager with a mock
// that always returns false for isRetryAllowed() (simulating exhausted budget)
RetryManager mockRetryManager = mock(RetryManager.class);
doReturn(false).when(mockRetryManager).isRetryAllowed();
doReturn(false).when(mockRetryManager).isRetryAllowed(anyInt());
retriableClient.setSingleKeyLongTailRetryManager(mockRetryManager);

GetRequestContext ctx = new GetRequestContext();
CompletableFuture<GenericRecord> result = retriableClient.get(ctx, "test_key");

try {
result.get(2, TimeUnit.SECONDS);
fail("Expected an ExecutionException");
} catch (TimeoutException e) {
// BUG CONFIRMED: finalFuture never completed.
// Timeline: long-tail retry fires at 100ms → retryTask runs → isRetryAllowed()=false → does nothing →
// retryFuture stays incomplete → original fails at 300ms → timeoutFuture.isDone()=true → error retry skipped
// → allOf(original, retryFuture) never completes → finalFuture hangs forever.
fail(
"BUG: finalFuture hangs forever when retry budget is exhausted and original request fails after "
+ "long-tail retry timer fires. retryFuture is never completed because retryTask does nothing "
+ "when isRetryAllowed() returns false, and the error retry path is skipped because "
+ "timeoutFuture.isDone() is true.");
} catch (ExecutionException e) {
// CORRECT: future completed with an exception (this means the bug is fixed)
assertTrue(e.getCause() instanceof VeniceClientException);
}
}

/**
* BUG REPRODUCTION: When original request fails with HTTP 429 (Too Many Requests),
* the error retry is intentionally skipped (429 should not be retried). But when the
* long-tail retry timer has NOT yet fired, the timer is cancelled, and retryFuture is
* never completed. CompletableFuture.allOf(original, retry) never completes, so
* finalFuture hangs forever.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testFinalFutureHangsWhenOriginalFailsWith429BeforeLongTailRetry()
throws InterruptedException, ExecutionException, TimeoutException {
clientConfigBuilder.setMetricsRepository(getVeniceMetricsRepository(FAST_CLIENT, CLIENT_METRIC_ENTITIES, true));
clientConfig = clientConfigBuilder.build();

StoreMetadata mockMetadata = mock(StoreMetadata.class);
doReturn(STORE_NAME).when(mockMetadata).getStoreName();
doReturn(1).when(mockMetadata).getLatestValueSchemaId();
doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1);

// Original request fails fast with 429 (BEFORE long-tail retry threshold)
InternalAvroStoreClient dispatchingClient =
new DispatchingAvroGenericStoreClient<Object, Object>(mockMetadata, clientConfig) {
@Override
protected CompletableFuture get(GetRequestContext requestContext, Object key) throws VeniceClientException {
InstanceHealthMonitor instanceHealthMonitor = mock(InstanceHealthMonitor.class);
doReturn(timeoutProcessor).when(instanceHealthMonitor).getTimeoutProcessor();
requestContext.instanceHealthMonitor = instanceHealthMonitor;

final CompletableFuture future = new CompletableFuture();
// Fail immediately with 429 - well before the 100ms long-tail retry threshold
scheduledExecutor.schedule(
() -> future.completeExceptionally(new VeniceClientRateExceededException("Too many requests")),
5,
TimeUnit.MILLISECONDS);
return future;
}
};

retriableClient = new RetriableAvroGenericStoreClient<>(dispatchingClient, clientConfig, timeoutProcessor);

GetRequestContext ctx = new GetRequestContext();
CompletableFuture<GenericRecord> result = retriableClient.get(ctx, "test_key");

try {
// If the bug exists, this will timeout because retryFuture is never completed
result.get(2, TimeUnit.SECONDS);
fail("Expected an ExecutionException from 429");
} catch (TimeoutException e) {
// BUG CONFIRMED: finalFuture hangs because:
// 1. Original fails with 429 → savedException set, timeoutFuture cancelled
// 2. isExceptionCausedByTooManyRequests(429) → true → error retry skipped
// 3. retryFuture never completed
// 4. allOf(original, retry) never fires → finalFuture incomplete forever
fail(
"BUG: finalFuture hangs forever when original request fails with 429 before long-tail retry fires. "
+ "The retry timer is cancelled, 429 check skips error retry, and retryFuture is never completed.");
} catch (ExecutionException e) {
// CORRECT: future completed with the 429 exception
assertTrue(e.getCause() instanceof VeniceClientRateExceededException);
}
}

/**
* BUG REPRODUCTION (variant with shorter threshold): Same as above but uses a 20ms threshold
* to clearly demonstrate the timing: long-tail fires at 20ms (budget denied → does nothing),
* original fails at 200ms (timeoutFuture.isDone()=true → error retry skipped).
* Uses reflection to mock the RetryManager for deterministic reproduction.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testFinalFutureHangsWhenLongTailFiresBeforeOriginalFailsAndBudgetExhausted() throws Exception {
int shortThresholdMicros = (int) TimeUnit.MILLISECONDS.toMicros(20); // 20ms

ClientConfig.ClientConfigBuilder builder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME)
.setR2Client(mock(com.linkedin.r2.transport.common.Client.class))
.setD2Client(mock(D2Client.class))
.setClusterDiscoveryD2Service("test_server_discovery")
.setLongTailRetryEnabledForSingleGet(true)
.setLongTailRetryThresholdForSingleGetInMicroSeconds(shortThresholdMicros)
.setMetricsRepository(getVeniceMetricsRepository(FAST_CLIENT, CLIENT_METRIC_ENTITIES, true));
ClientConfig testConfig = builder.build();

StoreMetadata mockMetadata = mock(StoreMetadata.class);
doReturn(STORE_NAME).when(mockMetadata).getStoreName();
doReturn(1).when(mockMetadata).getLatestValueSchemaId();
doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1);

InternalAvroStoreClient dispatchingClient =
new DispatchingAvroGenericStoreClient<Object, Object>(mockMetadata, testConfig) {
@Override
protected CompletableFuture get(GetRequestContext requestContext, Object key) throws VeniceClientException {
InstanceHealthMonitor instanceHealthMonitor = mock(InstanceHealthMonitor.class);
doReturn(timeoutProcessor).when(instanceHealthMonitor).getTimeoutProcessor();
requestContext.instanceHealthMonitor = instanceHealthMonitor;

final CompletableFuture future = new CompletableFuture();
// Fail slowly: 200ms >> 20ms threshold → long-tail fires first
scheduledExecutor.schedule(
() -> future.completeExceptionally(new VeniceClientException("Instance 500 error")),
200,
TimeUnit.MILLISECONDS);
return future;
}
};

RetriableAvroGenericStoreClient<String, GenericRecord> retryClient =
new RetriableAvroGenericStoreClient<>(dispatchingClient, testConfig, timeoutProcessor);

// Replace the RetryManager with a mock that always denies retries
RetryManager mockRetryManager = mock(RetryManager.class);
doReturn(false).when(mockRetryManager).isRetryAllowed();
doReturn(false).when(mockRetryManager).isRetryAllowed(anyInt());
retryClient.setSingleKeyLongTailRetryManager(mockRetryManager);

GetRequestContext bugCtx = new GetRequestContext();
CompletableFuture<GenericRecord> result = retryClient.get(bugCtx, "test_key");

try {
result.get(2, TimeUnit.SECONDS);
fail("Expected an ExecutionException");
} catch (TimeoutException e) {
fail(
"BUG: finalFuture hangs forever. Long-tail retry fired at 20ms but budget was exhausted (retryTask did "
+ "nothing). Original request failed at 200ms but error retry was skipped because "
+ "timeoutFuture.isDone()==true. retryFuture was never completed, blocking allOf() forever.");
} catch (ExecutionException e) {
// CORRECT behavior: future completed with exception
assertTrue(e.getCause() instanceof VeniceClientException);
}
}
}
Loading