From 4a20cdf03d362989bfcdf0ee22ffd7f2038fe50a Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Fri, 26 Jul 2024 14:06:52 -0700 Subject: [PATCH] [router][common][fast-client] Add RetryManager to venice-router (#1058) * [router][common][fast-client] Add RetryManager to venice-router for long tail retry 1. Move RetryManager to venice-common so it can be used by both fast-client and venice-router. Refactored the constructor a bit so in Venice router we can initialize instances of RetryManager per store (max 2, single-get and multi-get) without initializing multiple schedulers. 2. RetryManager for fast-client has store level resource isolation because a fast-client instance is mapped to a single Venice store. RetryManager for venice-router in this change is also made to have store level resource isolation. 3. Updated the fast-client RetryManagerStats with store name in metric prefix to avoid conflict/confusion since a user host could have multiple instances of fast-client for multiple stores. --- .../RetriableAvroGenericStoreClient.java | 47 ++++----- .../fastclient/meta/RetryManagerTest.java | 68 ------------- .../java/com/linkedin/venice/ConfigKeys.java | 27 +++++ .../linkedin/venice}/meta/RetryManager.java | 46 +++++---- .../venice}/stats/RetryManagerStats.java | 9 +- .../venice/meta/RetryManagerTest.java | 73 ++++++++++++++ ...entIndividualFeatureConfigurationTest.java | 18 ++-- .../venice/router/TestRouterRetry.java | 99 +++++++++++++++++-- .../linkedin/venice/router/RouterServer.java | 19 +++- .../venice/router/VeniceRouterConfig.java | 32 ++++++ .../venice/router/api/VeniceDelegateMode.java | 9 +- .../venice/router/api/VenicePathParser.java | 55 ++++++++++- .../router/api/path/VeniceComputePath.java | 19 ++-- .../router/api/path/VeniceMultiGetPath.java | 16 ++- .../router/api/path/VeniceMultiKeyPath.java | 20 +++- .../venice/router/api/path/VenicePath.java | 22 ++++- .../router/api/path/VeniceSingleGetPath.java | 6 +- .../router/api/TestVeniceDelegateMode.java | 37 ++++++- .../router/api/TestVenicePathParser.java | 21 +++- .../api/path/TestVeniceComputePath.java | 10 +- .../api/path/TestVeniceMultiGetPath.java | 25 +++-- .../router/api/path/TestVenicePath.java | 85 +++++++++++++--- 22 files changed, 567 insertions(+), 196 deletions(-) delete mode 100644 clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RetryManagerTest.java rename {clients/venice-client/src/main/java/com/linkedin/venice/fastclient => internal/venice-common/src/main/java/com/linkedin/venice}/meta/RetryManager.java (82%) rename {clients/venice-client/src/main/java/com/linkedin/venice/fastclient => internal/venice-common/src/main/java/com/linkedin/venice}/stats/RetryManagerStats.java (84%) create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/meta/RetryManagerTest.java diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java index 4ae3b60f1c7..7502fada451 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.java @@ -8,12 +8,15 @@ import com.linkedin.venice.client.store.streaming.StreamingCallback; import com.linkedin.venice.compute.ComputeRequestWrapper; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.fastclient.meta.RetryManager; +import com.linkedin.venice.meta.RetryManager; +import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.Collections; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.avro.Schema; @@ -31,6 +34,7 @@ * 2. Leverage some smart logic to avoid useless retry, such as retry triggered by heavy GC. */ public class RetriableAvroGenericStoreClient extends DelegatingAvroStoreClient { + private static final String FAST_CLIENT_RETRY_MANAGER_THREAD_PREFIX = "Fast-client-retry-manager-thread"; private final boolean longTailRetryEnabledForSingleGet; private final boolean longTailRetryEnabledForBatchGet; private final boolean longTailRetryEnabledForCompute; @@ -38,18 +42,20 @@ public class RetriableAvroGenericStoreClient extends DelegatingAvroStoreCl private final int longTailRetryThresholdForBatchGetInMicroSeconds; private final int longTailRetryThresholdForComputeInMicroSeconds; private final TimeoutProcessor timeoutProcessor; + private final ScheduledExecutorService retryManagerExecutorService = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory(FAST_CLIENT_RETRY_MANAGER_THREAD_PREFIX)); /** * The long tail retry budget is only applied to long tail retries. If there were any exception that's not a 429 the - * retry will be triggered without going through the long tail {@link RetryManager}. If the retry budget is exhausted + * retry will be triggered without going through the long tail {@link com.linkedin.venice.meta.RetryManager}. If the retry budget is exhausted * then the retry task will do nothing and the request will either complete eventually (original future) or time out. */ - private RetryManager singleGetLongTailRetryManager = null; - private RetryManager multiGetLongTailRetryManager = null; + private RetryManager singleKeyLongTailRetryManager = null; + private RetryManager multiKeyLongTailRetryManager = null; private static final Logger LOGGER = LogManager.getLogger(RetriableAvroGenericStoreClient.class); // Default value of 0.1 meaning only 10 percent of the user requests are allowed to trigger long tail retry private static final double LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL = 0.1d; - private static final String SINGLE_GET_LONG_TAIL_RETRY_STATS_PREFIX = "single-get-long-tail-retry-manager"; - private static final String MULTI_GET_LONG_TAIL_RETRY_STATS_PREFIX = "multi-get-long-tail-retry-manager"; + private static final String SINGLE_KEY_LONG_TAIL_RETRY_STATS_PREFIX = "single-key-long-tail-retry-manager-"; + private static final String MULTI_KEY_LONG_TAIL_RETRY_STATS_PREFIX = "multi-key-long-tail-retry-manager-"; public RetriableAvroGenericStoreClient( InternalAvroStoreClient delegate, @@ -71,18 +77,20 @@ public RetriableAvroGenericStoreClient( clientConfig.getLongTailRetryThresholdForComputeInMicroSeconds(); this.timeoutProcessor = timeoutProcessor; if (longTailRetryEnabledForSingleGet) { - this.singleGetLongTailRetryManager = new RetryManager( + this.singleKeyLongTailRetryManager = new RetryManager( clientConfig.getClusterStats().getMetricsRepository(), - SINGLE_GET_LONG_TAIL_RETRY_STATS_PREFIX, + SINGLE_KEY_LONG_TAIL_RETRY_STATS_PREFIX + clientConfig.getStoreName(), clientConfig.getLongTailRetryBudgetEnforcementWindowInMs(), - LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL); + LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, + retryManagerExecutorService); } if (longTailRetryEnabledForBatchGet) { - this.multiGetLongTailRetryManager = new RetryManager( + this.multiKeyLongTailRetryManager = new RetryManager( clientConfig.getClusterStats().getMetricsRepository(), - MULTI_GET_LONG_TAIL_RETRY_STATS_PREFIX, + MULTI_KEY_LONG_TAIL_RETRY_STATS_PREFIX + clientConfig.getStoreName(), clientConfig.getLongTailRetryBudgetEnforcementWindowInMs(), - LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL); + LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, + retryManagerExecutorService); } } @@ -133,7 +141,7 @@ protected CompletableFuture get(GetRequestContext requestContext, K key) thro // if longTailRetry is not enabled for single get, simply return the original future return originalRequestFuture; } - singleGetLongTailRetryManager.recordRequest(); + singleKeyLongTailRetryManager.recordRequest(); final CompletableFuture retryFuture = new CompletableFuture<>(); final CompletableFuture finalFuture = new CompletableFuture<>(); @@ -145,7 +153,7 @@ protected CompletableFuture get(GetRequestContext requestContext, K key) thro retryFuture.completeExceptionally(savedException.get()); return; } - if (savedException.get() != null || singleGetLongTailRetryManager.isRetryAllowed()) { + if (savedException.get() != null || singleKeyLongTailRetryManager.isRetryAllowed()) { super.get(requestContext, key).whenComplete((value, throwable) -> { if (throwable != null) { retryFuture.completeExceptionally(throwable); @@ -261,12 +269,7 @@ public void compute( @Override public void close() { - if (singleGetLongTailRetryManager != null) { - singleGetLongTailRetryManager.close(); - } - if (multiGetLongTailRetryManager != null) { - multiGetLongTailRetryManager.close(); - } + retryManagerExecutorService.shutdownNow(); super.close(); } @@ -309,7 +312,7 @@ private , RESPONSE> void retryStreamingMu finalRequestCompletionFuture.completeExceptionally(throwable); return; } - if (throwable != null || multiGetLongTailRetryManager.isRetryAllowed()) { + if (throwable != null || multiKeyLongTailRetryManager.isRetryAllowed()) { Set pendingKeys = Collections.unmodifiableSet(pendingKeysFuture.keySet()); R retryRequestContext = requestContextConstructor.construct(pendingKeys.size(), requestContext.isPartialSuccessAllowed); @@ -357,7 +360,7 @@ private , RESPONSE> void retryStreamingMu savedException, pendingKeysFuture, scheduledRetryTask)); - multiGetLongTailRetryManager.recordRequest(); + multiKeyLongTailRetryManager.recordRequest(); finalRequestCompletionFuture.whenComplete((ignore, finalException) -> { if (!scheduledRetryTask.isDone()) { diff --git a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RetryManagerTest.java b/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RetryManagerTest.java deleted file mode 100644 index 75501188b31..00000000000 --- a/clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RetryManagerTest.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.linkedin.venice.fastclient.meta; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; - -import com.linkedin.venice.utils.TestUtils; -import io.tehuti.metrics.MetricsRepository; -import java.io.IOException; -import java.time.Clock; -import java.util.concurrent.TimeUnit; -import org.testng.Assert; -import org.testng.annotations.Test; - - -public class RetryManagerTest { - private static final long TEST_TIMEOUT_IN_MS = 10000; - - @Test(timeOut = TEST_TIMEOUT_IN_MS) - public void testRetryManagerDisabled() throws IOException { - Clock mockClock = mock(Clock.class); - long start = System.currentTimeMillis(); - doReturn(start).when(mockClock).millis(); - MetricsRepository metricsRepository = new MetricsRepository(); - RetryManager retryManager = new RetryManager(metricsRepository, "test-retry-manager", 0, 0.1d, mockClock); - retryManager.recordRequest(); - for (int i = 0; i < 10; i++) { - Assert.assertTrue(retryManager.isRetryAllowed()); - } - Assert.assertNull(retryManager.getRetryTokenBucket()); - Assert.assertNull(metricsRepository.getMetric(".test-retry-manager--retry_limit_per_seconds.Gauge")); - retryManager.close(); - } - - @Test(timeOut = TEST_TIMEOUT_IN_MS) - public void testRetryManager() throws IOException { - Clock mockClock = mock(Clock.class); - long start = System.currentTimeMillis(); - doReturn(start).when(mockClock).millis(); - MetricsRepository metricsRepository = new MetricsRepository(); - try (RetryManager retryManager = new RetryManager(metricsRepository, "test-retry-manager", 1000, 0.1d, mockClock)) { - doReturn(start + 1000).when(mockClock).millis(); - for (int i = 0; i < 50; i++) { - retryManager.recordRequest(); - } - TestUtils.waitForNonDeterministicAssertion( - 5, - TimeUnit.SECONDS, - () -> Assert.assertNotNull(retryManager.getRetryTokenBucket())); - // The retry budget should be set to 50 * 0.1 = 5 - // With refill interval of 1s and capacity multiple of 5 that makes the token bucket capacity of 25 - Assert - .assertEquals(metricsRepository.getMetric(".test-retry-manager--retry_limit_per_seconds.Gauge").value(), 5d); - Assert.assertEquals(metricsRepository.getMetric(".test-retry-manager--retries_remaining.Gauge").value(), 25d); - for (int i = 0; i < 25; i++) { - Assert.assertTrue(retryManager.isRetryAllowed()); - } - Assert.assertFalse(retryManager.isRetryAllowed()); - Assert.assertEquals(metricsRepository.getMetric(".test-retry-manager--retries_remaining.Gauge").value(), 0d); - Assert.assertTrue(metricsRepository.getMetric(".test-retry-manager--rejected_retry.OccurrenceRate").value() > 0); - doReturn(start + 2001).when(mockClock).millis(); - // We should eventually be able to perform retries again - TestUtils.waitForNonDeterministicAssertion( - 5, - TimeUnit.SECONDS, - () -> Assert.assertTrue(retryManager.isRetryAllowed())); - } - } -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index e81d042d2cb..e213c96808e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2085,4 +2085,31 @@ private ConfigKeys() { * Only used in batch push jobs and partial updates. */ public static final String CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES = "controller.default.max.record.size.bytes"; + + /**g + * Percentage of total single get requests that are allowed for retry in decimal. e.g. 0.1 would mean up to 10% of the + * total single get requests are allowed for long tail retry. This is to prevent retry storm and cascading failures. + */ + public static final String ROUTER_SINGLE_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL = + "router.single.key.long.tail.retry.budget.percent.decimal"; + + /** + * Percentage of total multi get requests that are allowed for retry in decimal. e.g. 0.1 would mean up to 10% of the + * total multi get requests are allowed for long tail retry. This is to prevent retry storm and cascading failures. + */ + public static final String ROUTER_MULTI_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL = + "router.multi.key.long.tail.retry.budget.percent.decimal"; + + /** + * Enforcement window for router long tail retry budget token bucket. This applies to both single get and multi get + * retry managers. + */ + public static final String ROUTER_LONG_TAIL_RETRY_BUDGET_ENFORCEMENT_WINDOW_MS = + "router.long.tail.retry.budget.enforcement.window.ms"; + + /** + * The core pool size for the thread pool executor which contains threads responsible for measuring and updating all + * retry managers in router periodically to provide retry budget based on a percentage of the original requests. + */ + public static final String ROUTER_RETRY_MANAGER_CORE_POOL_SIZE = "router.retry.manager.core.pool.size"; } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RetryManager.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/RetryManager.java similarity index 82% rename from clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RetryManager.java rename to internal/venice-common/src/main/java/com/linkedin/venice/meta/RetryManager.java index f0f75d97502..cab8a0c63e0 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RetryManager.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/RetryManager.java @@ -1,11 +1,9 @@ -package com.linkedin.venice.fastclient.meta; +package com.linkedin.venice.meta; -import com.linkedin.venice.fastclient.stats.RetryManagerStats; +import com.linkedin.venice.stats.RetryManagerStats; import com.linkedin.venice.throttle.TokenBucket; import io.tehuti.metrics.MetricsRepository; -import java.io.Closeable; import java.time.Clock; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -15,11 +13,7 @@ import org.apache.logging.log4j.Logger; -/** - * This class offers advanced client retry behaviors. Specifically enforcing a retry budget and relevant monitoring to - * avoid retry storm and alert users when the retry threshold is misconfigured or service is degrading. - */ -public class RetryManager implements Closeable { +public class RetryManager { private static final Logger LOGGER = LogManager.getLogger(RetryManager.class); private static final int TOKEN_BUCKET_REFILL_INTERVAL_IN_SECONDS = 1; /** @@ -43,16 +37,16 @@ public RetryManager( String metricNamePrefix, long enforcementWindowInMs, double retryBudgetInPercentDecimal, - Clock clock) { + Clock clock, + ScheduledExecutorService scheduler) { + this.scheduler = scheduler; if (enforcementWindowInMs <= 0 || retryBudgetInPercentDecimal <= 0) { - scheduler = null; retryBudgetEnabled.set(false); } else { - scheduler = Executors.newScheduledThreadPool(1); retryBudgetEnabled.set(true); lastUpdateTimestamp = clock.millis(); retryManagerStats = new RetryManagerStats(metricsRepository, metricNamePrefix, this); - scheduler.schedule(this::updateRetryTokenBucket, enforcementWindowInMs, TimeUnit.MILLISECONDS); + this.scheduler.schedule(this::updateRetryTokenBucket, enforcementWindowInMs, TimeUnit.MILLISECONDS); } this.enforcementWindowInMs = enforcementWindowInMs; this.retryBudgetInPercentDecimal = retryBudgetInPercentDecimal; @@ -63,8 +57,15 @@ public RetryManager( MetricsRepository metricsRepository, String metricNamePrefix, long enforcementWindowInMs, - double retryBudgetInPercentDecimal) { - this(metricsRepository, metricNamePrefix, enforcementWindowInMs, retryBudgetInPercentDecimal, Clock.systemUTC()); + double retryBudgetInPercentDecimal, + ScheduledExecutorService scheduler) { + this( + metricsRepository, + metricNamePrefix, + enforcementWindowInMs, + retryBudgetInPercentDecimal, + Clock.systemUTC(), + scheduler); } public void recordRequest() { @@ -74,15 +75,19 @@ public void recordRequest() { } public boolean isRetryAllowed() { + return this.isRetryAllowed(1); + } + + public boolean isRetryAllowed(int numberOfRetries) { TokenBucket tokenBucket = retryTokenBucket.get(); if (!retryBudgetEnabled.get() || tokenBucket == null) { // All retries are allowed when the feature is disabled or during the very first enforcement window when we // haven't collected enough data points yet return true; } - boolean retryAllowed = retryTokenBucket.get().tryConsume(1); + boolean retryAllowed = retryTokenBucket.get().tryConsume(numberOfRetries); if (!retryAllowed) { - retryManagerStats.recordRejectedRetry(); + retryManagerStats.recordRejectedRetry(numberOfRetries); } return retryAllowed; } @@ -132,11 +137,4 @@ private void updateTokenBucket(long newQPS) { public TokenBucket getRetryTokenBucket() { return retryTokenBucket.get(); } - - @Override - public void close() { - if (scheduler != null) { - scheduler.shutdownNow(); - } - } } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/RetryManagerStats.java b/internal/venice-common/src/main/java/com/linkedin/venice/stats/RetryManagerStats.java similarity index 84% rename from clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/RetryManagerStats.java rename to internal/venice-common/src/main/java/com/linkedin/venice/stats/RetryManagerStats.java index 1abc3c7439e..8cebf4393f6 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/RetryManagerStats.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/stats/RetryManagerStats.java @@ -1,7 +1,6 @@ -package com.linkedin.venice.fastclient.stats; +package com.linkedin.venice.stats; -import com.linkedin.venice.fastclient.meta.RetryManager; -import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.throttle.TokenBucket; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; @@ -35,7 +34,7 @@ public RetryManagerStats(MetricsRepository metricsRepository, String name, Retry rejectedRetrySensor = registerSensorIfAbsent("rejected_retry", new OccurrenceRate()); } - public void recordRejectedRetry() { - rejectedRetrySensor.record(); + public void recordRejectedRetry(int count) { + rejectedRetrySensor.record(count); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/RetryManagerTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/RetryManagerTest.java new file mode 100644 index 00000000000..02614ed9a5f --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/RetryManagerTest.java @@ -0,0 +1,73 @@ +package com.linkedin.venice.meta; + +import static org.mockito.Mockito.*; + +import com.linkedin.venice.utils.TestUtils; +import io.tehuti.metrics.MetricsRepository; +import java.io.IOException; +import java.time.Clock; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + + +public class RetryManagerTest { + private static final long TEST_TIMEOUT_IN_MS = 10000; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + @AfterClass + public void cleanUp() { + scheduler.shutdownNow(); + } + + @Test(timeOut = TEST_TIMEOUT_IN_MS) + public void testRetryManagerDisabled() throws IOException { + Clock mockClock = mock(Clock.class); + long start = System.currentTimeMillis(); + doReturn(start).when(mockClock).millis(); + MetricsRepository metricsRepository = new MetricsRepository(); + RetryManager retryManager = + new RetryManager(metricsRepository, "test-retry-manager", 0, 0.1d, mockClock, scheduler); + retryManager.recordRequest(); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(retryManager.isRetryAllowed()); + } + Assert.assertNull(retryManager.getRetryTokenBucket()); + Assert.assertNull(metricsRepository.getMetric(".test-retry-manager--retry_limit_per_seconds.Gauge")); + } + + @Test(timeOut = TEST_TIMEOUT_IN_MS) + public void testRetryManager() throws IOException { + Clock mockClock = mock(Clock.class); + long start = System.currentTimeMillis(); + doReturn(start).when(mockClock).millis(); + MetricsRepository metricsRepository = new MetricsRepository(); + RetryManager retryManager = + new RetryManager(metricsRepository, "test-retry-manager", 1000, 0.1d, mockClock, scheduler); + doReturn(start + 1000).when(mockClock).millis(); + for (int i = 0; i < 50; i++) { + retryManager.recordRequest(); + } + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert.assertNotNull(retryManager.getRetryTokenBucket())); + // The retry budget should be set to 50 * 0.1 = 5 + // With refill interval of 1s and capacity multiple of 5 that makes the token bucket capacity of 25 + Assert.assertEquals(metricsRepository.getMetric(".test-retry-manager--retry_limit_per_seconds.Gauge").value(), 5d); + Assert.assertEquals(metricsRepository.getMetric(".test-retry-manager--retries_remaining.Gauge").value(), 25d); + for (int i = 0; i < 25; i++) { + Assert.assertTrue(retryManager.isRetryAllowed()); + } + Assert.assertFalse(retryManager.isRetryAllowed()); + Assert.assertEquals(metricsRepository.getMetric(".test-retry-manager--retries_remaining.Gauge").value(), 0d); + Assert.assertTrue(metricsRepository.getMetric(".test-retry-manager--rejected_retry.OccurrenceRate").value() > 0); + doReturn(start + 2001).when(mockClock).millis(); + // We should eventually be able to perform retries again + TestUtils + .waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> Assert.assertTrue(retryManager.isRetryAllowed())); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java index f123db237ea..505f63dc742 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java @@ -410,8 +410,8 @@ public void testLongTailRetryManagerStats() throws IOException, ExecutionExcepti .setLongTailRetryThresholdForBatchGetInMicroSeconds(10000) .setLongTailRetryBudgetEnforcementWindowInMs(1000) .setSpeculativeQueryEnabled(false); - String multiGetLongTailRetryManagerStatsPrefix = ".multi-get-long-tail-retry-manager--"; - String singleGetLongTailRetryManagerStatsPrefix = ".single-get-long-tail-retry-manager--"; + String multiKeyLongTailRetryManagerStatsPrefix = ".multi-key-long-tail-retry-manager-" + storeName + "--"; + String singleKeyLongTailRetryManagerStatsPrefix = ".single-key-long-tail-retry-manager-" + storeName + "--"; MetricsRepository clientMetric = new MetricsRepository(); AvroGenericStoreClient genericFastClient = getGenericFastClient(clientConfigBuilder, clientMetric, StoreMetadataFetchMode.SERVER_BASED_METADATA); @@ -427,19 +427,19 @@ public void testLongTailRetryManagerStats() throws IOException, ExecutionExcepti 10, TimeUnit.SECONDS, () -> assertTrue( - clientMetric.getMetric(multiGetLongTailRetryManagerStatsPrefix + "retry_limit_per_seconds.Gauge") + clientMetric.getMetric(multiKeyLongTailRetryManagerStatsPrefix + "retry_limit_per_seconds.Gauge") .value() > 0, "Current value: " - + clientMetric.getMetric(multiGetLongTailRetryManagerStatsPrefix + "retry_limit_per_seconds.Gauge") + + clientMetric.getMetric(multiKeyLongTailRetryManagerStatsPrefix + "retry_limit_per_seconds.Gauge") .value())); - assertTrue(clientMetric.getMetric(multiGetLongTailRetryManagerStatsPrefix + "retries_remaining.Gauge").value() > 0); + assertTrue(clientMetric.getMetric(multiKeyLongTailRetryManagerStatsPrefix + "retries_remaining.Gauge").value() > 0); assertEquals( - clientMetric.getMetric(multiGetLongTailRetryManagerStatsPrefix + "rejected_retry.OccurrenceRate").value(), + clientMetric.getMetric(multiKeyLongTailRetryManagerStatsPrefix + "rejected_retry.OccurrenceRate").value(), 0d); // single get long tail retry manager metrics shouldn't be initialized because it's not enabled - assertNull(clientMetric.getMetric(singleGetLongTailRetryManagerStatsPrefix + "retry_limit_per_seconds.Gauge")); - assertNull(clientMetric.getMetric(singleGetLongTailRetryManagerStatsPrefix + "retries_remaining.Gauge")); - assertNull(clientMetric.getMetric(singleGetLongTailRetryManagerStatsPrefix + "rejected_retry.OccurrenceRate")); + assertNull(clientMetric.getMetric(singleKeyLongTailRetryManagerStatsPrefix + "retry_limit_per_seconds.Gauge")); + assertNull(clientMetric.getMetric(singleKeyLongTailRetryManagerStatsPrefix + "retries_remaining.Gauge")); + assertNull(clientMetric.getMetric(singleKeyLongTailRetryManagerStatsPrefix + "rejected_retry.OccurrenceRate")); } @Test(timeOut = TIME_OUT) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java index 8ada96feb06..dadf4f7d5b4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestRouterRetry.java @@ -11,6 +11,7 @@ import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.Version; @@ -35,8 +36,8 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -50,6 +51,9 @@ public class TestRouterRetry { private int valueSchemaId; private String storeName; + // default extraProperties + private Properties extraProperties; + private static final String KEY_SCHEMA_STR = "\"string\""; private static final String VALUE_FIELD_NAME = "int_field"; private static final String VALUE_SCHEMA_STR = @@ -58,15 +62,14 @@ public class TestRouterRetry { private static final Schema VALUE_SCHEMA = new Schema.Parser().parse(VALUE_SCHEMA_STR); private static final String KEY_PREFIX = "key_"; - @BeforeClass(alwaysRun = true) - public void setUp() throws VeniceClientException, ExecutionException, InterruptedException { - Utils.thisIsLocalhost(); - Properties extraProperties = new Properties(); + @BeforeMethod(alwaysRun = true) + public void setUp() { + extraProperties = new Properties(); // Add the following specific configs for Router // To trigger long-tail retry extraProperties.put(ConfigKeys.ROUTER_LONG_TAIL_RETRY_FOR_SINGLE_GET_THRESHOLD_MS, 1); extraProperties.put(ConfigKeys.ROUTER_MAX_KEY_COUNT_IN_MULTIGET_REQ, MAX_KEY_LIMIT); // 10 keys at most in a - // batch-get request + // batch-get request extraProperties.put(ConfigKeys.ROUTER_LONG_TAIL_RETRY_FOR_BATCH_GET_THRESHOLD_MS, "1-:1"); extraProperties.put(ConfigKeys.ROUTER_SMART_LONG_TAIL_RETRY_ENABLED, true); @@ -74,8 +77,20 @@ public void setUp() throws VeniceClientException, ExecutionException, Interrupte extraProperties.put( ConfigKeys.DEFAULT_OFFLINE_PUSH_STRATEGY, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION.toString()); + } - veniceCluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, true, false, extraProperties); + private void initCluster() throws VeniceClientException, ExecutionException, InterruptedException { + Utils.thisIsLocalhost(); + VeniceClusterCreateOptions options = new VeniceClusterCreateOptions.Builder().numberOfControllers(1) + .numberOfServers(2) + .numberOfRouters(1) + .replicationFactor(2) + .partitionSize(100) + .sslToStorageNodes(true) + .sslToKafka(false) + .extraProperties(extraProperties) + .build(); + veniceCluster = ServiceFactory.getVeniceCluster(options); routerAddr = veniceCluster.getRandomRouterSslURL(); // Create test store @@ -123,7 +138,7 @@ public void setUp() throws VeniceClientException, ExecutionException, Interrupte veniceCluster.stopVeniceServer(veniceCluster.getVeniceServers().get(0).getPort()); } - @AfterClass(alwaysRun = true) + @AfterMethod(alwaysRun = true) public void cleanUp() { Utils.closeQuietlyWithErrorLogged(veniceCluster); Utils.closeQuietlyWithErrorLogged(veniceWriter); @@ -139,6 +154,7 @@ private void updateStore(long readQuota, int maxKeyLimit) { @Test(timeOut = 60000) public void testRouterRetry() throws ExecutionException, InterruptedException { + initCluster(); try (AvroGenericStoreClient storeClient = ClientFactory.getAndStartGenericAvroClient( ClientConfig.defaultGenericClientConfig(storeName) .setVeniceURL(routerAddr) @@ -214,4 +230,69 @@ public void testRouterRetry() throws ExecutionException, InterruptedException { 0.0, "Unhealthy request for compute streaming is unexpected"); } + + @Test(timeOut = 60000) + public void testRouterMultiGetRetryManager() throws ExecutionException, InterruptedException { + extraProperties.put(ConfigKeys.ROUTER_LONG_TAIL_RETRY_BUDGET_ENFORCEMENT_WINDOW_MS, "1000"); + extraProperties.put(ConfigKeys.ROUTER_SINGLE_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, "0.1"); + extraProperties.put(ConfigKeys.ROUTER_MULTI_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, "0.1"); + initCluster(); + try (AvroGenericStoreClient storeClient = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(routerAddr) + .setSslFactory(SslUtils.getVeniceLocalSslFactory()))) { + Set keySet = new HashSet<>(); + for (int i = 0; i < MAX_KEY_LIMIT - 1; ++i) { + keySet.add(KEY_PREFIX + i); + } + Map result = storeClient.batchGet(keySet).get(); + Assert.assertEquals(result.size(), MAX_KEY_LIMIT - 1); + // Retry manager should eventually be initialized for multi-get + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + double multiGetRetryLimit = MetricsUtils.getSum( + ".multi-key-long-tail-retry-manager-" + storeName + "--retry_limit_per_seconds.Gauge", + veniceCluster.getVeniceRouters()); + Assert.assertTrue(multiGetRetryLimit > 0); + }); + double multiGetRejectedRetry = MetricsUtils.getSum( + ".multi-key-long-tail-retry-manager-" + storeName + "--rejected_retry.OccurrenceRate", + veniceCluster.getVeniceRouters()); + double singleGetRetryLimit = MetricsUtils.getSum( + "single-key-long-tail-retry-manager-" + storeName + "--retry_limit_per_seconds.Gauge", + veniceCluster.getVeniceRouters()); + Assert.assertEquals(multiGetRejectedRetry, 0.0, "Rejected retry is unexpected"); + Assert.assertEquals(singleGetRetryLimit, 0.0, "Single-key retry manager shouldn't be initialized"); + } + } + + @Test(timeOut = 60000) + public void testRouterSingleGetRetryManager() throws ExecutionException, InterruptedException { + extraProperties.put(ConfigKeys.ROUTER_LONG_TAIL_RETRY_BUDGET_ENFORCEMENT_WINDOW_MS, "1000"); + extraProperties.put(ConfigKeys.ROUTER_SINGLE_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, "0.1"); + extraProperties.put(ConfigKeys.ROUTER_MULTI_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, "0.1"); + initCluster(); + try (AvroGenericStoreClient storeClient = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(routerAddr) + .setSslFactory(SslUtils.getVeniceLocalSslFactory()))) { + String key = KEY_PREFIX + 1; + GenericRecord result = storeClient.get(key).get(); + Assert.assertNotNull(result, "Value should not be null"); + // Retry manager should eventually be initialized for single-get + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + double singleGetRetryLimit = MetricsUtils.getSum( + ".single-key-long-tail-retry-manager-" + storeName + "--retry_limit_per_seconds.Gauge", + veniceCluster.getVeniceRouters()); + Assert.assertTrue(singleGetRetryLimit > 0); + }); + double singleGetRejectedRetry = MetricsUtils.getSum( + ".single-key-long-tail-retry-manager-" + storeName + "--rejected_retry.OccurrenceRate", + veniceCluster.getVeniceRouters()); + double multiGetRetryLimit = MetricsUtils.getSum( + "multi-key-long-tail-retry-manager-" + storeName + "--retry_limit_per_seconds.Gauge", + veniceCluster.getVeniceRouters()); + Assert.assertEquals(singleGetRejectedRetry, 0.0, "Rejected retry is unexpected"); + Assert.assertEquals(multiGetRetryLimit, 0.0, "Multi-key retry manager shouldn't be initialized"); + } + } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java index 6e7e79142ee..060d175bcb6 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java @@ -84,6 +84,7 @@ import com.linkedin.venice.stats.VeniceJVMStats; import com.linkedin.venice.stats.ZkClientStatusStats; import com.linkedin.venice.throttle.EventThrottler; +import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.HelixUtils; import com.linkedin.venice.utils.ReflectUtils; import com.linkedin.venice.utils.SslUtils; @@ -111,6 +112,8 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -126,6 +129,8 @@ public class RouterServer extends AbstractVeniceService { private static final Logger LOGGER = LogManager.getLogger(RouterServer.class); + private static final String ROUTER_RETRY_MANAGER_THREAD_PREFIX = "Router-retry-manager-thread"; + // Immutable state private final List serviceDiscoveryAnnouncers; private final MetricsRepository metricsRepository; @@ -194,6 +199,8 @@ public class RouterServer extends AbstractVeniceService { private final AggHostHealthStats aggHostHealthStats; + private ScheduledExecutorService retryManagerExecutorService; + public static void main(String args[]) throws Exception { if (args.length != 1) { Utils.exit("USAGE: java -jar venice-router-all.jar "); @@ -518,13 +525,20 @@ public boolean startInner() throws Exception { config.getClusterName(), compressorFactory, metricsRepository); + + retryManagerExecutorService = Executors.newScheduledThreadPool( + config.getRetryManagerCorePoolSize(), + new DaemonThreadFactory(ROUTER_RETRY_MANAGER_THREAD_PREFIX)); + VenicePathParser pathParser = new VenicePathParser( versionFinder, partitionFinder, routerStats, metadataRepository, config, - compressorFactory); + compressorFactory, + metricsRepository, + retryManagerExecutorService); MetaDataHandler metaDataHandler = new MetaDataHandler( routingDataRepository, @@ -863,6 +877,9 @@ public void stopInner() throws Exception { if (heartbeat != null) { heartbeat.stopInner(); } + if (retryManagerExecutorService != null) { + retryManagerExecutorService.shutdownNow(); + } } public HelixBaseRoutingRepository getRoutingDataRepository() { diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/VeniceRouterConfig.java b/services/venice-router/src/main/java/com/linkedin/venice/router/VeniceRouterConfig.java index 0c21496ba93..f40bb2e3376 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/VeniceRouterConfig.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/VeniceRouterConfig.java @@ -60,6 +60,7 @@ import static com.linkedin.venice.ConfigKeys.ROUTER_IO_WORKER_COUNT; import static com.linkedin.venice.ConfigKeys.ROUTER_LEAKED_FUTURE_CLEANUP_POLL_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.ROUTER_LEAKED_FUTURE_CLEANUP_THRESHOLD_MS; +import static com.linkedin.venice.ConfigKeys.ROUTER_LONG_TAIL_RETRY_BUDGET_ENFORCEMENT_WINDOW_MS; import static com.linkedin.venice.ConfigKeys.ROUTER_LONG_TAIL_RETRY_FOR_BATCH_GET_THRESHOLD_MS; import static com.linkedin.venice.ConfigKeys.ROUTER_LONG_TAIL_RETRY_FOR_SINGLE_GET_THRESHOLD_MS; import static com.linkedin.venice.ConfigKeys.ROUTER_LONG_TAIL_RETRY_MAX_ROUTE_FOR_MULTI_KEYS_REQ; @@ -71,6 +72,7 @@ import static com.linkedin.venice.ConfigKeys.ROUTER_MAX_READ_CAPACITY; import static com.linkedin.venice.ConfigKeys.ROUTER_META_STORE_SHADOW_READ_ENABLED; import static com.linkedin.venice.ConfigKeys.ROUTER_MULTIGET_TARDY_LATENCY_MS; +import static com.linkedin.venice.ConfigKeys.ROUTER_MULTI_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL; import static com.linkedin.venice.ConfigKeys.ROUTER_MULTI_KEY_ROUTING_STRATEGY; import static com.linkedin.venice.ConfigKeys.ROUTER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS; import static com.linkedin.venice.ConfigKeys.ROUTER_PENDING_CONNECTION_RESUME_THRESHOLD_PER_ROUTE; @@ -80,7 +82,9 @@ import static com.linkedin.venice.ConfigKeys.ROUTER_QUOTA_CHECK_WINDOW; import static com.linkedin.venice.ConfigKeys.ROUTER_READ_QUOTA_THROTTLING_LEASE_TIMEOUT_MS; import static com.linkedin.venice.ConfigKeys.ROUTER_RESOLVE_BEFORE_SSL; +import static com.linkedin.venice.ConfigKeys.ROUTER_RETRY_MANAGER_CORE_POOL_SIZE; import static com.linkedin.venice.ConfigKeys.ROUTER_SINGLEGET_TARDY_LATENCY_MS; +import static com.linkedin.venice.ConfigKeys.ROUTER_SINGLE_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL; import static com.linkedin.venice.ConfigKeys.ROUTER_SMART_LONG_TAIL_RETRY_ABORT_THRESHOLD_MS; import static com.linkedin.venice.ConfigKeys.ROUTER_SMART_LONG_TAIL_RETRY_ENABLED; import static com.linkedin.venice.ConfigKeys.ROUTER_SOCKET_TIMEOUT; @@ -215,6 +219,11 @@ public class VeniceRouterConfig { private boolean httpClientOpensslEnabled; private String identityParserClassName; + private double singleKeyLongTailRetryBudgetPercentDecimal; + private double multiKeyLongTailRetryBudgetPercentDecimal; + private long longTailRetryBudgetEnforcementWindowInMs; + private int retryManagerCorePoolSize; + public VeniceRouterConfig(VeniceProperties props) { try { checkProperties(props); @@ -393,6 +402,13 @@ private void checkProperties(VeniceProperties props) { perStoreRouterQuotaBuffer = props.getDouble(ROUTER_PER_STORE_ROUTER_QUOTA_BUFFER, 1.5); httpClientOpensslEnabled = props.getBoolean(ROUTER_HTTP_CLIENT_OPENSSL_ENABLED, true); identityParserClassName = props.getString(IDENTITY_PARSER_CLASS, DefaultIdentityParser.class.getName()); + singleKeyLongTailRetryBudgetPercentDecimal = + props.getDouble(ROUTER_SINGLE_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, 0.0); + multiKeyLongTailRetryBudgetPercentDecimal = + props.getDouble(ROUTER_MULTI_KEY_LONG_TAIL_RETRY_BUDGET_PERCENT_DECIMAL, 0.0); + longTailRetryBudgetEnforcementWindowInMs = + props.getLong(ROUTER_LONG_TAIL_RETRY_BUDGET_ENFORCEMENT_WINDOW_MS, Time.MS_PER_MINUTE); + retryManagerCorePoolSize = props.getInt(ROUTER_RETRY_MANAGER_CORE_POOL_SIZE, 5); } public double getPerStoreRouterQuotaBuffer() { @@ -843,4 +859,20 @@ public boolean isHttpClientOpensslEnabled() { public String getIdentityParserClassName() { return identityParserClassName; } + + public double getSingleKeyLongTailRetryBudgetPercentDecimal() { + return singleKeyLongTailRetryBudgetPercentDecimal; + } + + public double getMultiKeyLongTailRetryBudgetPercentDecimal() { + return multiKeyLongTailRetryBudgetPercentDecimal; + } + + public long getLongTailRetryBudgetEnforcementWindowInMs() { + return longTailRetryBudgetEnforcementWindowInMs; + } + + public int getRetryManagerCorePoolSize() { + return retryManagerCorePoolSize; + } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java index 19455937b1d..d543cc95fc2 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java @@ -261,7 +261,6 @@ public , K, R> Scatter scatter( INTERNAL_SERVER_ERROR, "Ready-to-serve host must be an 'Instance'"); } - Instance veniceInstance = (Instance) host; if (!venicePath.isRetryRequest()) { /** @@ -286,12 +285,16 @@ public , K, R> Scatter scatter( "Quota exceeded for '" + storeName + "' while serving a " + venicePath.getRequestType() + " request! msg: " + e.getMessage()); } + // Only record route(s) of the original request for retry manager purposes. + venicePath.recordRequest(); } } if (venicePath.isRetryRequest()) { - // Check whether the retry request is allowed or not according to the max allowed retry route config - if (!venicePath.isLongTailRetryAllowedForNewRoute()) { + // Check whether the retry request is allowed or not according to the max allowed retry route config and retry + // manager's retry budget. Retry is only allowed if both conditions are true. + if (!venicePath.isLongTailRetryAllowedForNewRequest() + || !venicePath.isLongTailRetryWithinBudget(onlineRequestNum)) { routerStats.getStatsByType(venicePath.getRequestType()).recordDisallowedRetryRequest(storeName); throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( Optional.of(storeName), diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java index d623188cdda..1d2e40fac71 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java @@ -18,7 +18,9 @@ import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.exceptions.VeniceStoreIsMigratedException; import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreDataChangedListener; import com.linkedin.venice.meta.Version; import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.VeniceRouterConfig; @@ -32,11 +34,15 @@ import com.linkedin.venice.router.streaming.VeniceChunkedWriteHandler; import com.linkedin.venice.router.utils.VeniceRouterUtils; import com.linkedin.venice.streaming.StreamingUtils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.netty.channel.ChannelHandlerContext; +import io.tehuti.metrics.MetricsRepository; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; @@ -92,12 +98,27 @@ public class VenicePathParser public static final String TYPE_BLOB_DISCOVERY = RouterResourceType.TYPE_BLOB_DISCOVERY.toString(); + private static final String SINGLE_KEY_RETRY_MANAGER_STATS_PREFIX = "single-key-long-tail-retry-manager-"; + private static final String MULTI_KEY_RETRY_MANAGER_STATS_PREFIX = "multi-key-long-tail-retry-manager-"; + private final VeniceVersionFinder versionFinder; private final VenicePartitionFinder partitionFinder; private final RouterStats routerStats; private final ReadOnlyStoreRepository storeRepository; private final VeniceRouterConfig routerConfig; private final CompressorFactory compressorFactory; + private final MetricsRepository metricsRepository; + private final ScheduledExecutorService retryManagerScheduler; + private final Map routerSingleKeyRetryManagers; + private final Map routerMultiKeyRetryManagers; + + private final StoreDataChangedListener storeChangedListener = new StoreDataChangedListener() { + @Override + public void handleStoreDeleted(String storeName) { + routerSingleKeyRetryManagers.remove(storeName); + routerMultiKeyRetryManagers.remove(storeName); + } + }; public VenicePathParser( VeniceVersionFinder versionFinder, @@ -105,13 +126,20 @@ public VenicePathParser( RouterStats routerStats, ReadOnlyStoreRepository storeRepository, VeniceRouterConfig routerConfig, - CompressorFactory compressorFactory) { + CompressorFactory compressorFactory, + MetricsRepository metricsRepository, + ScheduledExecutorService retryManagerScheduler) { this.versionFinder = versionFinder; this.partitionFinder = partitionFinder; this.routerStats = routerStats; this.storeRepository = storeRepository; + this.storeRepository.registerStoreDataChangedListener(storeChangedListener); this.routerConfig = routerConfig; this.compressorFactory = compressorFactory; + this.metricsRepository = metricsRepository; + this.retryManagerScheduler = retryManagerScheduler; + this.routerSingleKeyRetryManagers = new VeniceConcurrentHashMap<>(); + this.routerMultiKeyRetryManagers = new VeniceConcurrentHashMap<>(); }; @Override @@ -152,6 +180,14 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout String method = fullHttpRequest.method().name(); if (VeniceRouterUtils.isHttpGet(method)) { + RetryManager singleKeyRetryManager = routerSingleKeyRetryManagers.computeIfAbsent( + storeName, + ignored -> new RetryManager( + metricsRepository, + SINGLE_KEY_RETRY_MANAGER_STATS_PREFIX + storeName, + routerConfig.getLongTailRetryBudgetEnforcementWindowInMs(), + routerConfig.getSingleKeyLongTailRetryBudgetPercentDecimal(), + retryManagerScheduler)); // single-get request path = new VeniceSingleGetPath( storeName, @@ -160,8 +196,17 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout pathHelper.getKey(), uri, partitionFinder, - routerStats); + routerStats, + singleKeyRetryManager); } else if (VeniceRouterUtils.isHttpPost(method)) { + RetryManager multiKeyRetryManager = routerMultiKeyRetryManagers.computeIfAbsent( + storeName, + ignored -> new RetryManager( + metricsRepository, + MULTI_KEY_RETRY_MANAGER_STATS_PREFIX + storeName, + routerConfig.getLongTailRetryBudgetEnforcementWindowInMs(), + routerConfig.getMultiKeyLongTailRetryBudgetPercentDecimal(), + retryManagerScheduler)); if (resourceType == RouterResourceType.TYPE_STORAGE) { // multi-get request path = new VeniceMultiGetPath( @@ -174,7 +219,8 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout routerConfig.isSmartLongTailRetryEnabled(), routerConfig.getSmartLongTailRetryAbortThresholdMs(), routerStats, - routerConfig.getLongTailRetryMaxRouteForMultiKeyReq()); + routerConfig.getLongTailRetryMaxRouteForMultiKeyReq(), + multiKeyRetryManager); path.setResponseHeaders( Collections.singletonMap( HttpConstants.VENICE_CLIENT_COMPUTE, @@ -190,7 +236,8 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout getBatchGetLimit(storeName), routerConfig.isSmartLongTailRetryEnabled(), routerConfig.getSmartLongTailRetryAbortThresholdMs(), - routerConfig.getLongTailRetryMaxRouteForMultiKeyReq()); + routerConfig.getLongTailRetryMaxRouteForMultiKeyReq(), + multiKeyRetryManager); if (storeRepository.isReadComputationEnabled(storeName)) { path = computePath; diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java index a9f02458df8..c57243b15ab 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java @@ -10,6 +10,7 @@ import com.linkedin.venice.HttpConstants; import com.linkedin.venice.compute.protocol.request.ComputeRequestV3; import com.linkedin.venice.compute.protocol.request.router.ComputeRouterRequestKeyV1; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.read.RequestType; import com.linkedin.venice.read.protocol.request.router.MultiGetRouterRequestKeyV1; import com.linkedin.venice.router.api.RouterExceptionAndTrackingUtils; @@ -90,14 +91,16 @@ public VeniceComputePath( int maxKeyCount, boolean smartLongTailRetryEnabled, int smartLongTailRetryAbortThresholdMs, - int longTailRetryMaxRouteForMultiKeyReq) throws RouterException { + int longTailRetryMaxRouteForMultiKeyReq, + RetryManager retryManager) throws RouterException { super( storeName, versionNumber, resourceName, smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + retryManager); this.valueSchemaIdHeader = request.headers().get(VENICE_COMPUTE_VALUE_SCHEMA_ID, "-1"); @@ -152,7 +155,8 @@ private VeniceComputePath( String computeRequestVersionHeader, boolean smartLongTailRetryEnabled, int smartLongTailRetryAbortThresholdMs, - int longTailRetryMaxRouteForMultiKeyReq) { + int longTailRetryMaxRouteForMultiKeyReq, + RetryManager retryManager) { super( storeName, versionNumber, @@ -160,7 +164,8 @@ private VeniceComputePath( smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, routerKeyMap, - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + retryManager); this.requestContent = requestContent; this.valueSchemaIdHeader = valueSchemaIdHeader; this.computeRequestLengthInBytes = computeRequestLengthInBytes; @@ -202,7 +207,8 @@ public VeniceMultiGetPath toMultiGetPath() { newRouterKeyMap, isSmartLongTailRetryEnabled(), getSmartLongTailRetryAbortThresholdMs(), - getLongTailRetryMaxRouteForMultiKeyReq()); + getLongTailRetryMaxRouteForMultiKeyReq(), + retryManager); newPath.setupRetryRelatedInfo(this); return newPath; } @@ -227,7 +233,8 @@ protected VeniceComputePath fixRetryRequestForSubPath(Map stats, - int longTailRetryMaxRouteForMultiKeyReq) throws RouterException { + int longTailRetryMaxRouteForMultiKeyReq, + RetryManager retryManager) throws RouterException { super( storeName, versionNumber, resourceName, smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + retryManager); // Validate API version int apiVersion = Integer.parseInt(request.headers().get(HttpConstants.VENICE_API_VERSION)); @@ -89,7 +92,8 @@ public VeniceMultiGetPath( Map routerKeyMap, boolean smartLongTailRetryEnabled, int smartLongTailRetryAbortThresholdMs, - int longTailRetryMaxRouteForMultiKeyReq) { + int longTailRetryMaxRouteForMultiKeyReq, + RetryManager retryManager) { super( storeName, versionNumber, @@ -97,7 +101,8 @@ public VeniceMultiGetPath( smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, routerKeyMap, - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + retryManager); setPartitionKeys(routerKeyMap.keySet()); } @@ -132,7 +137,8 @@ protected VeniceMultiGetPath fixRetryRequestForSubPath(Map(), - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + retryManager); } public VeniceMultiKeyPath( @@ -58,8 +61,15 @@ public VeniceMultiKeyPath( boolean smartLongTailRetryEnabled, int smartLongTailRetryAbortThresholdMs, Map routerKeyMap, - int longTailRetryMaxRouteForMultiKeyReq) { - super(storeName, versionNumber, resourceName, smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs); + int longTailRetryMaxRouteForMultiKeyReq, + RetryManager retryManager) { + super( + storeName, + versionNumber, + resourceName, + smartLongTailRetryEnabled, + smartLongTailRetryAbortThresholdMs, + retryManager); this.keyNum = routerKeyMap.size(); this.routerKeyMap = routerKeyMap; this.longTailRetryMaxRouteForMultiKeyReq = longTailRetryMaxRouteForMultiKeyReq; @@ -229,7 +239,7 @@ public int getLongTailRetryMaxRouteForMultiKeyReq() { } @Override - public boolean isLongTailRetryAllowedForNewRoute() { + public boolean isLongTailRetryAllowedForNewRequest() { if (longTailRetryMaxRouteForMultiKeyReq == -1) { // feature is disabled return true; diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VenicePath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VenicePath.java index 5f45f8373e7..8be66cec541 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VenicePath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VenicePath.java @@ -3,6 +3,7 @@ import com.linkedin.alpini.router.api.ResourcePath; import com.linkedin.venice.HttpConstants; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.api.RouterKey; import com.linkedin.venice.router.api.VeniceResponseDecompressor; @@ -33,6 +34,7 @@ public abstract class VenicePath implements ResourcePath { private Collection partitionKeys; protected final String storeName; protected final int versionNumber; + protected final RetryManager retryManager; private final Time time; private boolean retryRequest = false; private final boolean smartLongTailRetryEnabled; @@ -65,14 +67,16 @@ public VenicePath( int versionNumber, String resourceName, boolean smartLongTailRetryEnabled, - int smartLongTailRetryAbortThresholdMs) { + int smartLongTailRetryAbortThresholdMs, + RetryManager retryManager) { this( storeName, versionNumber, resourceName, smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, - new SystemTime()); + new SystemTime(), + retryManager); } public VenicePath( @@ -81,13 +85,15 @@ public VenicePath( String resourceName, boolean smartLongTailRetryEnabled, int smartLongTailRetryAbortThresholdMs, - Time time) { + Time time, + RetryManager retryManager) { this.resourceName = resourceName; this.storeName = storeName; this.versionNumber = versionNumber; this.smartLongTailRetryEnabled = smartLongTailRetryEnabled; this.smartLongTailRetryAbortThresholdMs = smartLongTailRetryAbortThresholdMs; this.time = time; + this.retryManager = retryManager; } public synchronized long getRequestId() { @@ -333,7 +339,7 @@ public boolean isStreamingRequest() { return getChunkedResponse() != null; } - public boolean isLongTailRetryAllowedForNewRoute() { + public boolean isLongTailRetryAllowedForNewRequest() { return true; } @@ -354,4 +360,12 @@ protected RequestType getStreamingRequestType() { public abstract byte[] getBody(); public abstract String getVeniceApiVersionHeader(); + + public void recordRequest() { + retryManager.recordRequest(); + } + + public boolean isLongTailRetryWithinBudget(int numberOfRoutes) { + return retryManager.isRetryAllowed(numberOfRoutes); + } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java index 0d39ca3592f..94a1bd0495d 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java @@ -8,6 +8,7 @@ import com.linkedin.alpini.router.api.RouterException; import com.linkedin.venice.RequestConstants; import com.linkedin.venice.exceptions.VeniceNoHelixResourceException; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.api.RouterExceptionAndTrackingUtils; import com.linkedin.venice.router.api.RouterKey; @@ -41,8 +42,9 @@ public VeniceSingleGetPath( String key, String uri, VenicePartitionFinder partitionFinder, - RouterStats stats) throws RouterException { - super(storeName, versionNumber, resourceName, false, -1); + RouterStats stats, + RetryManager retryManager) throws RouterException { + super(storeName, versionNumber, resourceName, false, -1, retryManager); if (StringUtils.isEmpty(key)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( Optional.empty(), diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceDelegateMode.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceDelegateMode.java index cba2c5da18c..e558337e713 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceDelegateMode.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceDelegateMode.java @@ -5,6 +5,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; @@ -26,6 +27,7 @@ import com.linkedin.venice.helix.HelixInstanceConfigRepository; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.meta.Version; import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.VeniceRouterConfig; @@ -60,6 +62,8 @@ public class TestVeniceDelegateMode { + private RetryManager retryManager; + private VenicePath getVenicePath( String storeName, int version, @@ -76,7 +80,8 @@ private VenicePath getVenicePath( RequestType requestType, List keys, Set slowStorageNodeSet) { - return new VenicePath(storeName, version, resourceName, false, -1) { + retryManager = mock(RetryManager.class); + return new VenicePath(storeName, version, resourceName, false, -1, retryManager) { private final String ROUTER_REQUEST_VERSION = Integer.toString(ReadAvroProtocolDefinition.SINGLE_GET_ROUTER_REQUEST_V1.getProtocolVersion()); @@ -278,6 +283,21 @@ public void testScatterWithSingleGet() throws RouterException { Assert.assertEquals(hosts.size(), 1, "There should be only one chose host"); Instance selectedHost = hosts.get(0); Assert.assertTrue(instanceList.contains(selectedHost)); + verify(retryManager, times(1)).recordRequest(); + verify(retryManager, never()).isRetryAllowed(anyInt()); + + // Verify retry manager behavior for retry request + path = getVenicePath(storeName, version, resourceName, RequestType.SINGLE_GET, keys); + doReturn(true).when(retryManager).isRetryAllowed(anyInt()); + path.setRetryRequest(); + scatter = new Scatter(path, getPathParser(), VeniceRole.REPLICA); + RouterStats routerStats = mock(RouterStats.class); + doReturn(mock(AggRouterHttpRequestStats.class)).when(routerStats).getStatsByType(any()); + scatterMode = new VeniceDelegateMode(config, routerStats, mock(RouteHttpRequestStats.class)); + scatterMode.initReadRequestThrottler(getReadRequestThrottle(false)); + scatterMode.scatter(scatter, requestMethod, resourceName, partitionFinder, hostFinder, monitor, VeniceRole.REPLICA); + verify(retryManager, never()).recordRequest(); + verify(retryManager, times(1)).isRetryAllowed(anyInt()); } @Test(expectedExceptions = RouterException.class, expectedExceptionsMessageRegExp = ".*not available to serve request of type: SINGLE_GET") @@ -487,6 +507,21 @@ public void testScatterWithMultiGet() throws RouterException { Assert.assertTrue( instanceSet.contains(instance3) || instanceSet.contains(instance5), "One of instance3/instance5 should be selected"); + verify(retryManager, times(3)).recordRequest(); + verify(retryManager, never()).isRetryAllowed(anyInt()); + + // Verify retry manager behavior for retry request + path = getVenicePath(storeName, version, resourceName, RequestType.MULTI_GET, keys); + doReturn(true).when(retryManager).isRetryAllowed(anyInt()); + path.setRetryRequest(); + scatter = new Scatter(path, getPathParser(), VeniceRole.REPLICA); + RouterStats routerStats = mock(RouterStats.class); + doReturn(mock(AggRouterHttpRequestStats.class)).when(routerStats).getStatsByType(any()); + scatterMode = new VeniceDelegateMode(config, routerStats, mock(RouteHttpRequestStats.class)); + scatterMode.initReadRequestThrottler(getReadRequestThrottle(false)); + scatterMode.scatter(scatter, requestMethod, resourceName, partitionFinder, hostFinder, monitor, VeniceRole.REPLICA); + verify(retryManager, never()).recordRequest(); + verify(retryManager, atLeastOnce()).isRetryAllowed(eq(3)); } @Test diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java index c036454053d..b86f169b50a 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -126,7 +127,9 @@ public void testParseResourceUri_ComputeRequest() throws RouterException { getMockedStats(), storeRepository, mock(VeniceRouterConfig.class), - mock(CompressorFactory.class)); + mock(CompressorFactory.class), + mock(MetricsRepository.class), + mock(ScheduledExecutorService.class)); String storeName = "test-store"; String uri = "storage/" + storeName; @@ -186,7 +189,9 @@ public void parsesQueries() throws RouterException { getMockedStats(), mock(ReadOnlyStoreRepository.class), MOCK_ROUTER_CONFIG, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class), + mock(ScheduledExecutorService.class)); BasicFullHttpRequest request = new BasicFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri, 0, 0); VenicePath path = parser.parseResourceUri(uri, request); String keyB64 = Base64.getEncoder().encodeToString("key".getBytes()); @@ -215,7 +220,9 @@ public void parsesB64Uri() throws RouterException { getMockedStats(), mock(ReadOnlyStoreRepository.class), MOCK_ROUTER_CONFIG, - compressorFactory).parseResourceUri(myUri, request); + compressorFactory, + mock(MetricsRepository.class), + mock(ScheduledExecutorService.class)).parseResourceUri(myUri, request); ByteBuffer partitionKey = path.getPartitionKey().getKeyBuffer(); Assert.assertEquals( path.getPartitionKey().getKeyBuffer(), @@ -234,7 +241,9 @@ public void failsToParseOtherActions() throws RouterException { getMockedStats(), mock(ReadOnlyStoreRepository.class), MOCK_ROUTER_CONFIG, - compressorFactory).parseResourceUri("/badAction/storeName/key"); + compressorFactory, + mock(MetricsRepository.class), + mock(ScheduledExecutorService.class)).parseResourceUri("/badAction/storeName/key"); } @Test @@ -279,7 +288,9 @@ public void parseRequestWithBatchSizeViolation() throws RouterException { mockRouterStats, storeRepository, MOCK_ROUTER_CONFIG, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class), + mock(ScheduledExecutorService.class)); try { pathParser.parseResourceUri(myUri, request); fail("A RouterException should be thrown here"); diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceComputePath.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceComputePath.java index aafd216a228..494f3031421 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceComputePath.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceComputePath.java @@ -16,6 +16,7 @@ import com.linkedin.venice.compute.protocol.request.CosineSimilarity; import com.linkedin.venice.compute.protocol.request.DotProduct; import com.linkedin.venice.compute.protocol.request.enums.ComputeOperationType; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.router.api.VenicePartitionFinder; import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition; @@ -126,7 +127,8 @@ public void testDeserializationCorrectness() throws RouterException { 10, false, -1, - 1); + 1, + mock(RetryManager.class)); Assert.assertEquals(computePath.getComputeRequestLengthInBytes(), expectedLength); } } @@ -215,7 +217,8 @@ void testToMultiGetPath() throws RouterException { smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, null, - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + mock(RetryManager.class)); byte[] serializedMultiGetRequest = multiGetPath.serializeRouterRequest(); VeniceComputePath computePath = new VeniceComputePath( @@ -227,7 +230,8 @@ void testToMultiGetPath() throws RouterException { maxKeyCount, smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs, - longTailRetryMaxRouteForMultiKeyReq); + longTailRetryMaxRouteForMultiKeyReq, + mock(RetryManager.class)); VeniceMultiGetPath syntheticMultiGetPath = computePath.toMultiGetPath(); Assert.assertEquals(syntheticMultiGetPath.serializeRouterRequest(), serializedMultiGetRequest); Assert diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceMultiGetPath.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceMultiGetPath.java index dbf0b7d733f..3a0cbc98f04 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceMultiGetPath.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVeniceMultiGetPath.java @@ -9,6 +9,7 @@ import com.linkedin.alpini.router.api.RouterException; import com.linkedin.venice.HttpConstants; import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.router.RouterThrottleHandler; import com.linkedin.venice.router.api.RouterExceptionAndTrackingUtils; @@ -35,6 +36,9 @@ public class TestVeniceMultiGetPath { + private final RetryManager disabledRetryManager = + new RetryManager(new MetricsRepository(), "disabled-test-retry-manager", 0, 0, null); + @BeforeClass public void setUp() { RouterExceptionAndTrackingUtils.setRouterStats( @@ -115,7 +119,8 @@ public void testMultiGetReqWithTooManyKeys() throws RouterException { false, -1, null, - 1); + 1, + disabledRetryManager); } @Test(expectedExceptions = RouterException.class, expectedExceptionsMessageRegExp = ".*but received.*") @@ -143,7 +148,8 @@ public void testMultiGetReqWithInvalidAPIVersion() throws RouterException { false, -1, null, - 1); + 1, + disabledRetryManager); } @Test @@ -162,7 +168,6 @@ public void testAllowedRouteRetry() throws RouterException { BasicFullHttpRequest request = getMultiGetHttpRequest(resourceName, keys, Optional.empty()); request.attr(RouterThrottleHandler.THROTTLE_HANDLER_BYTE_ATTRIBUTE_KEY) .set(multiGetRequestSerializer.serializeObjects(keys)); - VenicePath path = new VeniceMultiGetPath( storeName, version, @@ -173,9 +178,10 @@ public void testAllowedRouteRetry() throws RouterException { false, -1, null, - 1); - Assert.assertTrue(path.isLongTailRetryAllowedForNewRoute()); - Assert.assertFalse(path.isLongTailRetryAllowedForNewRoute()); + 1, + disabledRetryManager); + Assert.assertTrue(path.isLongTailRetryAllowedForNewRequest()); + Assert.assertFalse(path.isLongTailRetryAllowedForNewRequest()); request = getMultiGetHttpRequest(resourceName, keys, Optional.empty()); request.attr(RouterThrottleHandler.THROTTLE_HANDLER_BYTE_ATTRIBUTE_KEY) @@ -190,8 +196,9 @@ public void testAllowedRouteRetry() throws RouterException { false, -1, null, - -1); - Assert.assertTrue(path.isLongTailRetryAllowedForNewRoute()); - Assert.assertTrue(path.isLongTailRetryAllowedForNewRoute()); + -1, + disabledRetryManager); + Assert.assertTrue(path.isLongTailRetryAllowedForNewRequest()); + Assert.assertTrue(path.isLongTailRetryAllowedForNewRequest()); } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVenicePath.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVenicePath.java index 4e6c5962906..da07b2f9262 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVenicePath.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/path/TestVenicePath.java @@ -1,18 +1,30 @@ package com.linkedin.venice.router.api.path; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import com.linkedin.alpini.base.concurrency.Executors; +import com.linkedin.venice.meta.RetryManager; import com.linkedin.venice.read.RequestType; import com.linkedin.venice.router.api.RouterKey; import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition; import com.linkedin.venice.utils.TestMockTime; +import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; +import io.tehuti.metrics.MetricsRepository; +import java.time.Clock; import java.util.Collection; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.http.client.methods.HttpUriRequest; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -25,8 +37,8 @@ private static class SmartRetryVenicePath extends VenicePath { private final String ROUTER_REQUEST_VERSION = Integer.toString(ReadAvroProtocolDefinition.SINGLE_GET_ROUTER_REQUEST_V1.getProtocolVersion()); - public SmartRetryVenicePath(Time time) { - super("fake_resource", 1, "fake_resource_v1", true, SMART_LONG_TAIL_RETRY_ABORT_THRESHOLD_MS, time); + public SmartRetryVenicePath(Time time, RetryManager retryManager) { + super("fake_resource", 1, "fake_resource_v1", true, SMART_LONG_TAIL_RETRY_ABORT_THRESHOLD_MS, time, retryManager); } @Override @@ -70,18 +82,36 @@ public String getLocation() { } } + private RetryManager disabledRetryManager; + private MetricsRepository metricsRepository; + + private final ScheduledExecutorService retryManagerScheduler = Executors.newScheduledThreadPool(1); + + @BeforeMethod + public void setUp() { + metricsRepository = new MetricsRepository(); + // retry manager is disabled by default + disabledRetryManager = + new RetryManager(metricsRepository, "disabled-test-retry-manager", 0, 0, retryManagerScheduler); + } + + @AfterClass + public void cleanUp() { + retryManagerScheduler.shutdownNow(); + } + @Test public void testRetryAbortBecauseOfTimeConstraint() { TestMockTime time = new TestMockTime(); time.setTime(1); - SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time, disabledRetryManager); orgPath.setLongTailRetryThresholdMs(20); assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE1)); assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE2)); orgPath.recordOriginalRequestStartTimestamp(); orgPath.markStorageNodeAsFast(STORAGE_NODE1); - SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time, disabledRetryManager); retryPath.setRetryRequest(); retryPath.setupRetryRelatedInfo(orgPath); time.sleep(SMART_LONG_TAIL_RETRY_ABORT_THRESHOLD_MS + orgPath.getLongTailRetryThresholdMs() + 1); @@ -93,7 +123,7 @@ public void testRetryAbortBecauseOfTimeConstraint() { public void testRetryAbortBecauseOfSlowStorageNode() { TestMockTime time = new TestMockTime(); time.setTime(1); - SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time, disabledRetryManager); orgPath.setLongTailRetryThresholdMs(20); assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE1)); orgPath.requestStorageNode(STORAGE_NODE1); @@ -103,7 +133,7 @@ public void testRetryAbortBecauseOfSlowStorageNode() { assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE2)); orgPath.recordOriginalRequestStartTimestamp(); - SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time, disabledRetryManager); retryPath.setRetryRequest(); retryPath.setupRetryRelatedInfo(orgPath); time.sleep(1); @@ -116,12 +146,12 @@ public void testRetryAbortBecauseOfSlowStorageNode() { public void testSlowNodeIgnoredWhen5XXcodeReturned() { TestMockTime time = new TestMockTime(); time.setTime(1); - SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time, disabledRetryManager); orgPath.setLongTailRetryThresholdMs(20); assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE1)); orgPath.requestStorageNode(STORAGE_NODE1); - SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time, disabledRetryManager); retryPath.setupRetryRelatedInfo(orgPath); retryPath.setRetryRequest(HttpResponseStatus.BAD_REQUEST); time.sleep(1); @@ -138,14 +168,14 @@ public void testSlowNodeIgnoredWhen5XXcodeReturned() { public void testRetryLogicWhenMetBothCriteria() { TestMockTime time = new TestMockTime(); time.setTime(1); - SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time); + SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time, disabledRetryManager); orgPath.setLongTailRetryThresholdMs(20); assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE1)); assertTrue(orgPath.canRequestStorageNode(STORAGE_NODE2)); orgPath.recordOriginalRequestStartTimestamp(); orgPath.markStorageNodeAsFast(STORAGE_NODE1); - SmartRetryVenicePath retryPath1 = new SmartRetryVenicePath(time); + SmartRetryVenicePath retryPath1 = new SmartRetryVenicePath(time, disabledRetryManager); retryPath1.setRetryRequest(); retryPath1.setupRetryRelatedInfo(orgPath); time.sleep(1); @@ -153,10 +183,43 @@ public void testRetryLogicWhenMetBothCriteria() { assertTrue(retryPath1.canRequestStorageNode(STORAGE_NODE1)); // Retry to an unknown storage node - SmartRetryVenicePath retryPath2 = new SmartRetryVenicePath(time); + SmartRetryVenicePath retryPath2 = new SmartRetryVenicePath(time, disabledRetryManager); retryPath2.setRetryRequest(); retryPath2.setupRetryRelatedInfo(orgPath); assertFalse(retryPath2.isRetryRequestTooLate()); assertTrue(retryPath2.canRequestStorageNode(STORAGE_NODE1)); } + + @Test + public void testRetryManager() { + Clock mockClock = mock(Clock.class); + TestMockTime time = new TestMockTime(); + long start = System.currentTimeMillis(); + time.setTime(start); + doReturn(start).when(mockClock).millis(); + RetryManager retryManager = + new RetryManager(metricsRepository, "test-retry-manager", 1000, 0.1, mockClock, retryManagerScheduler); + retryManager.recordRequest(); + doReturn(start + 1000).when(mockClock).millis(); + // The retry budget should be set to: ceiling(1*0.1) = 1 and the token bucket capacity should be + // 5 (1 * TOKEN_BUCKET_CAPACITY_MULTIPLE) + TestUtils.waitForNonDeterministicAssertion( + 3, + TimeUnit.SECONDS, + () -> Assert.assertNotNull(retryManager.getRetryTokenBucket())); + SmartRetryVenicePath orgPath = new SmartRetryVenicePath(time, retryManager); + orgPath.setLongTailRetryThresholdMs(20); + for (int i = 0; i < 6; i++) { + SmartRetryVenicePath retryPath = new SmartRetryVenicePath(time, retryManager); + retryPath.setRetryRequest(); + retryPath.setupRetryRelatedInfo(orgPath); + if (i < 5) { + assertTrue(retryPath.isLongTailRetryAllowedForNewRequest()); + assertTrue(retryPath.isLongTailRetryWithinBudget(1)); + } else { + assertTrue(retryPath.isLongTailRetryAllowedForNewRequest()); + assertFalse(retryPath.isLongTailRetryWithinBudget(1)); + } + } + } }