-
Notifications
You must be signed in to change notification settings - Fork 114
[fast-client] Fix 3 fast client bugs where CompletableFutures are never completed #2478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ad6be9a
e05ed79
e5715ac
d41c63f
c449df8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import static com.linkedin.venice.schema.Utils.loadSchemaFileAsString; | ||
| import static com.linkedin.venice.stats.ClientType.FAST_CLIENT; | ||
| import static com.linkedin.venice.stats.VeniceMetricsRepository.getVeniceMetricsRepository; | ||
| import static org.mockito.ArgumentMatchers.anyInt; | ||
| import static org.mockito.Mockito.doReturn; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.testng.Assert.assertEquals; | ||
|
|
@@ -20,6 +21,7 @@ | |
| import com.linkedin.d2.balancer.D2Client; | ||
| import com.linkedin.r2.transport.common.Client; | ||
| import com.linkedin.venice.client.exceptions.VeniceClientException; | ||
| import com.linkedin.venice.client.exceptions.VeniceClientRateExceededException; | ||
| import com.linkedin.venice.client.store.ComputeGenericRecord; | ||
| import com.linkedin.venice.client.store.streaming.StreamingCallback; | ||
| import com.linkedin.venice.client.store.streaming.VeniceResponseMap; | ||
|
|
@@ -28,6 +30,7 @@ | |
| import com.linkedin.venice.fastclient.meta.StoreMetadata; | ||
| import com.linkedin.venice.fastclient.stats.FastClientStats; | ||
| import com.linkedin.venice.fastclient.utils.ClientTestUtils; | ||
| import com.linkedin.venice.meta.RetryManager; | ||
| import com.linkedin.venice.read.RequestType; | ||
| import com.linkedin.venice.utils.DataProviderUtils; | ||
| import com.linkedin.venice.utils.TestUtils; | ||
|
|
@@ -45,6 +48,7 @@ | |
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.generic.GenericData; | ||
| import org.apache.avro.generic.GenericRecord; | ||
|
|
@@ -848,4 +852,208 @@ public void testGetWithTriggeringLongTailRetryAndBothFailsV2(RequestType request | |
| testComputeAndValidateMetrics(true, true, false, false, false); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * BUG REPRODUCTION: When the long-tail retry fires but the retry budget is exhausted | ||
| * (isRetryAllowed() returns false), the retryTask does nothing and retryFuture is never completed. | ||
| * Later, when the original request fails, the error retry path is skipped because | ||
| * timeoutFuture.isDone() is true (long-tail already consumed it). This leaves finalFuture | ||
| * incomplete forever. | ||
| * | ||
| * This is the exact scenario observed in the mirror heap dump: instance returns 500 on heartbeat, | ||
| * many requests time out exhausting the retry budget, subsequent requests hang forever. | ||
| * | ||
| * This test uses the {@code setSingleKeyLongTailRetryManager(...)} test hook to install a | ||
| * {@link RetryManager} that always denies retries, ensuring deterministic reproduction | ||
| * of this scenario regardless of timing. | ||
| */ | ||
| @Test(timeOut = TEST_TIMEOUT) | ||
| public void testFinalFutureHangsWhenRetryBudgetExhaustedAndOriginalFails() throws Exception { | ||
| clientConfigBuilder.setMetricsRepository(getVeniceMetricsRepository(FAST_CLIENT, CLIENT_METRIC_ENTITIES, true)); | ||
| clientConfig = clientConfigBuilder.build(); | ||
|
|
||
| StoreMetadata mockMetadata = mock(StoreMetadata.class); | ||
| doReturn(STORE_NAME).when(mockMetadata).getStoreName(); | ||
| doReturn(1).when(mockMetadata).getLatestValueSchemaId(); | ||
| doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1); | ||
|
|
||
| // Original request is slow: completes AFTER long-tail retry threshold with an error. | ||
| // This ensures the long-tail retry fires first, then original fails. | ||
| InternalAvroStoreClient dispatchingClient = | ||
| new DispatchingAvroGenericStoreClient<Object, Object>(mockMetadata, clientConfig) { | ||
| @Override | ||
| protected CompletableFuture get(GetRequestContext requestContext, Object key) throws VeniceClientException { | ||
| InstanceHealthMonitor instanceHealthMonitor = mock(InstanceHealthMonitor.class); | ||
| doReturn(timeoutProcessor).when(instanceHealthMonitor).getTimeoutProcessor(); | ||
| requestContext.instanceHealthMonitor = instanceHealthMonitor; | ||
|
|
||
| final CompletableFuture originalRequestFuture = new CompletableFuture(); | ||
| scheduledExecutor.schedule( | ||
| () -> originalRequestFuture | ||
| .completeExceptionally(new VeniceClientException("Instance unhealthy, 500 error")), | ||
| LONG_TAIL_RETRY_THRESHOLD_IN_MS * 3, // 300ms >> 100ms threshold | ||
| TimeUnit.MILLISECONDS); | ||
| return originalRequestFuture; | ||
| } | ||
| }; | ||
|
|
||
| retriableClient = new RetriableAvroGenericStoreClient<>(dispatchingClient, clientConfig, timeoutProcessor); | ||
|
|
||
| // Replace the internal singleKeyLongTailRetryManager with a mock | ||
| // that always returns false for isRetryAllowed() (simulating exhausted budget) | ||
| RetryManager mockRetryManager = mock(RetryManager.class); | ||
| doReturn(false).when(mockRetryManager).isRetryAllowed(); | ||
| doReturn(false).when(mockRetryManager).isRetryAllowed(anyInt()); | ||
| retriableClient.setSingleKeyLongTailRetryManager(mockRetryManager); | ||
|
|
||
| GetRequestContext ctx = new GetRequestContext(); | ||
| CompletableFuture<GenericRecord> result = retriableClient.get(ctx, "test_key"); | ||
|
|
||
| try { | ||
| result.get(2, TimeUnit.SECONDS); | ||
| fail("Expected an ExecutionException"); | ||
| } catch (TimeoutException e) { | ||
| // BUG CONFIRMED: finalFuture never completed. | ||
| // Timeline: long-tail retry fires at 100ms → retryTask runs → isRetryAllowed()=false → does nothing → | ||
| // retryFuture stays incomplete → original fails at 300ms → timeoutFuture.isDone()=true → error retry skipped | ||
| // → allOf(original, retryFuture) never completes → finalFuture hangs forever. | ||
| fail( | ||
| "BUG: finalFuture hangs forever when retry budget is exhausted and original request fails after " | ||
| + "long-tail retry timer fires. retryFuture is never completed because retryTask does nothing " | ||
| + "when isRetryAllowed() returns false, and the error retry path is skipped because " | ||
| + "timeoutFuture.isDone() is true."); | ||
| } catch (ExecutionException e) { | ||
| // CORRECT: future completed with an exception (this means the bug is fixed) | ||
| assertTrue(e.getCause() instanceof VeniceClientException); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * BUG REPRODUCTION: When original request fails with HTTP 429 (Too Many Requests), | ||
| * the error retry is intentionally skipped (429 should not be retried). But when the | ||
| * long-tail retry timer has NOT yet fired, the timer is cancelled, and retryFuture is | ||
| * never completed. CompletableFuture.allOf(original, retry) never completes, so | ||
| * finalFuture hangs forever. | ||
| */ | ||
| @Test(timeOut = TEST_TIMEOUT) | ||
| public void testFinalFutureHangsWhenOriginalFailsWith429BeforeLongTailRetry() | ||
| throws InterruptedException, ExecutionException, TimeoutException { | ||
| clientConfigBuilder.setMetricsRepository(getVeniceMetricsRepository(FAST_CLIENT, CLIENT_METRIC_ENTITIES, true)); | ||
| clientConfig = clientConfigBuilder.build(); | ||
|
|
||
| StoreMetadata mockMetadata = mock(StoreMetadata.class); | ||
| doReturn(STORE_NAME).when(mockMetadata).getStoreName(); | ||
| doReturn(1).when(mockMetadata).getLatestValueSchemaId(); | ||
| doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1); | ||
|
|
||
| // Original request fails fast with 429 (BEFORE long-tail retry threshold) | ||
| InternalAvroStoreClient dispatchingClient = | ||
| new DispatchingAvroGenericStoreClient<Object, Object>(mockMetadata, clientConfig) { | ||
| @Override | ||
| protected CompletableFuture get(GetRequestContext requestContext, Object key) throws VeniceClientException { | ||
| InstanceHealthMonitor instanceHealthMonitor = mock(InstanceHealthMonitor.class); | ||
| doReturn(timeoutProcessor).when(instanceHealthMonitor).getTimeoutProcessor(); | ||
| requestContext.instanceHealthMonitor = instanceHealthMonitor; | ||
|
|
||
| final CompletableFuture future = new CompletableFuture(); | ||
| // Fail immediately with 429 - well before the 100ms long-tail retry threshold | ||
| scheduledExecutor.schedule( | ||
| () -> future.completeExceptionally(new VeniceClientRateExceededException("Too many requests")), | ||
| 5, | ||
| TimeUnit.MILLISECONDS); | ||
| return future; | ||
| } | ||
| }; | ||
|
|
||
| retriableClient = new RetriableAvroGenericStoreClient<>(dispatchingClient, clientConfig, timeoutProcessor); | ||
|
|
||
| GetRequestContext ctx = new GetRequestContext(); | ||
| CompletableFuture<GenericRecord> result = retriableClient.get(ctx, "test_key"); | ||
|
|
||
| try { | ||
| // If the bug exists, this will timeout because retryFuture is never completed | ||
| result.get(2, TimeUnit.SECONDS); | ||
| fail("Expected an ExecutionException from 429"); | ||
| } catch (TimeoutException e) { | ||
| // BUG CONFIRMED: finalFuture hangs because: | ||
| // 1. Original fails with 429 → savedException set, timeoutFuture cancelled | ||
| // 2. isExceptionCausedByTooManyRequests(429) → true → error retry skipped | ||
| // 3. retryFuture never completed | ||
| // 4. allOf(original, retry) never fires → finalFuture incomplete forever | ||
| fail( | ||
| "BUG: finalFuture hangs forever when original request fails with 429 before long-tail retry fires. " | ||
| + "The retry timer is cancelled, 429 check skips error retry, and retryFuture is never completed."); | ||
| } catch (ExecutionException e) { | ||
| // CORRECT: future completed with the 429 exception | ||
| assertTrue(e.getCause() instanceof VeniceClientRateExceededException); | ||
| } | ||
|
Comment on lines
+986
to
+989
|
||
| } | ||
|
|
||
| /** | ||
| * BUG REPRODUCTION (variant with shorter threshold): Same as above but uses a 20ms threshold | ||
| * to clearly demonstrate the timing: long-tail fires at 20ms (budget denied → does nothing), | ||
| * original fails at 200ms (timeoutFuture.isDone()=true → error retry skipped). | ||
| * Uses {@code setSingleKeyLongTailRetryManager(...)} to inject a mock RetryManager for deterministic reproduction. | ||
| */ | ||
| @Test(timeOut = TEST_TIMEOUT) | ||
| public void testFinalFutureHangsWhenLongTailFiresBeforeOriginalFailsAndBudgetExhausted() throws Exception { | ||
| int shortThresholdMicros = (int) TimeUnit.MILLISECONDS.toMicros(20); // 20ms | ||
|
|
||
| ClientConfig.ClientConfigBuilder builder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME) | ||
| .setR2Client(mock(com.linkedin.r2.transport.common.Client.class)) | ||
| .setD2Client(mock(D2Client.class)) | ||
| .setClusterDiscoveryD2Service("test_server_discovery") | ||
| .setLongTailRetryEnabledForSingleGet(true) | ||
| .setLongTailRetryThresholdForSingleGetInMicroSeconds(shortThresholdMicros) | ||
| .setMetricsRepository(getVeniceMetricsRepository(FAST_CLIENT, CLIENT_METRIC_ENTITIES, true)); | ||
| ClientConfig testConfig = builder.build(); | ||
|
|
||
| StoreMetadata mockMetadata = mock(StoreMetadata.class); | ||
| doReturn(STORE_NAME).when(mockMetadata).getStoreName(); | ||
| doReturn(1).when(mockMetadata).getLatestValueSchemaId(); | ||
| doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1); | ||
|
|
||
| InternalAvroStoreClient dispatchingClient = | ||
| new DispatchingAvroGenericStoreClient<Object, Object>(mockMetadata, testConfig) { | ||
| @Override | ||
| protected CompletableFuture get(GetRequestContext requestContext, Object key) throws VeniceClientException { | ||
| InstanceHealthMonitor instanceHealthMonitor = mock(InstanceHealthMonitor.class); | ||
| doReturn(timeoutProcessor).when(instanceHealthMonitor).getTimeoutProcessor(); | ||
| requestContext.instanceHealthMonitor = instanceHealthMonitor; | ||
|
|
||
| final CompletableFuture future = new CompletableFuture(); | ||
| // Fail slowly: 200ms >> 20ms threshold → long-tail fires first | ||
| scheduledExecutor.schedule( | ||
| () -> future.completeExceptionally(new VeniceClientException("Instance 500 error")), | ||
| 200, | ||
| TimeUnit.MILLISECONDS); | ||
| return future; | ||
| } | ||
| }; | ||
|
|
||
| RetriableAvroGenericStoreClient<String, GenericRecord> retryClient = | ||
| new RetriableAvroGenericStoreClient<>(dispatchingClient, testConfig, timeoutProcessor); | ||
|
|
||
| // Replace the RetryManager with a mock that always denies retries | ||
| RetryManager mockRetryManager = mock(RetryManager.class); | ||
| doReturn(false).when(mockRetryManager).isRetryAllowed(); | ||
| doReturn(false).when(mockRetryManager).isRetryAllowed(anyInt()); | ||
| retryClient.setSingleKeyLongTailRetryManager(mockRetryManager); | ||
|
|
||
| GetRequestContext bugCtx = new GetRequestContext(); | ||
| CompletableFuture<GenericRecord> result = retryClient.get(bugCtx, "test_key"); | ||
|
|
||
| try { | ||
| result.get(2, TimeUnit.SECONDS); | ||
| fail("Expected an ExecutionException"); | ||
| } catch (TimeoutException e) { | ||
| fail( | ||
| "BUG: finalFuture hangs forever. Long-tail retry fired at 20ms but budget was exhausted (retryTask did " | ||
| + "nothing). Original request failed at 200ms but error retry was skipped because " | ||
| + "timeoutFuture.isDone()==true. retryFuture was never completed, blocking allOf() forever."); | ||
| } catch (ExecutionException e) { | ||
| // CORRECT behavior: future completed with exception | ||
| assertTrue(e.getCause() instanceof VeniceClientException); | ||
| } | ||
|
Comment on lines
+1054
to
+1057
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions in these new tests assume
result.get()fails with anExecutionExceptionwhose direct cause isVeniceClientException. However,RetriableAvroGenericStoreClient#get()completesfinalFuturefrom aCompletableFuture.allOf(...)callback, andallOfcompletes exceptionally with aCompletionExceptionwrapper. As a result,e.getCause()is likelyCompletionException(with the Venice exception as its cause). Update the assertion to unwrapCompletionException(or useExceptionUtils.recursiveClassEquals) so the test matches the actual exception shape.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion is correct as-is.
CompletableFuture.allOf()usescompleteRelayinternally which propagates the rawAltResultfrom the source future without wrapping inCompletionException.CompletionExceptionwrapping only happens when exceptions propagate through transformation stages (thenApply,thenCompose, etc.), not throughallOf. Since bothoriginalRequestFutureandretryFutureare completed viacompleteExceptionally(), the throwable passed to theallOf().whenComplete()callback is the rawVeniceClientException, which is then passed tofinalFuture.completeExceptionally(). Soresult.get()throwsExecutionException(VeniceClientException)ande.getCause() instanceof VeniceClientExceptionis correct. All 4 tests pass with this assertion.