From eed157b97e5a757032a7c87ae9fcf213d7346b80 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Thu, 14 Mar 2024 11:32:09 -0700 Subject: [PATCH] [server] Extend retry for new superset schema fetching. (#897) Previous when user updates a new value schema to the AAWC store, the ongoing ingestion will get acknowledged by ZK store to refresh the schema. Previous retry for fetching the new schema from zk is 3 times with a fixed 100ms, which is very short. Without fetching new schema, the store ingestion could fail. For a real-prod system, usually server need to wait for 3~5 seconds to fetch this schema after new schema registered. Here we use exponential back off retry. Co-authored-by: Hao Xu --- .../venice/helix/HelixReadOnlySchemaRepository.java | 13 +++++++++++-- .../helix/HelixReadOnlySchemaRepositoryTest.java | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepository.java index 6d05bda21fe..ad655a36b66 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepository.java @@ -163,7 +163,10 @@ SchemaEntry forceRefreshSupersetSchemaWithRetry(String storeName) { Store store = getStoreRepository().getStore(storeName); int supersetSchemaId = store.getLatestSuperSetValueSchemaId(); AtomicReference supersetSchemaEntry = new AtomicReference<>(); - RetryUtils.executeWithMaxAttempt(() -> { + long currentTimestamp = System.currentTimeMillis(); + List> retriableExceptions = + Collections.singletonList(InvalidVeniceSchemaException.class); + RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> { try { getSchemaLock().writeLock().lock(); SchemaData schemaData = getSchemaMap().get(storeName); @@ -176,7 +179,13 @@ SchemaEntry forceRefreshSupersetSchemaWithRetry(String storeName) { } finally { getSchemaLock().writeLock().unlock(); } - }, 3, Duration.ofMillis(100), Collections.singletonList(InvalidVeniceSchemaException.class)); + }, 10, Duration.ofSeconds(1), Duration.ofMinutes(1), Duration.ofMinutes(5), retriableExceptions); + long timePassed = System.currentTimeMillis() - currentTimestamp; + logger.info( + "Obtain superset schema id: {} for store {} with time in milliseconds: {}.", + supersetSchemaId, + storeName, + timePassed); return supersetSchemaEntry.get(); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepositoryTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepositoryTest.java index 646a2e4b945..74380d9a3cd 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepositoryTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlySchemaRepositoryTest.java @@ -135,7 +135,7 @@ public void testForceRefreshSchemaData() { // 3 times force refresh still won't get the schema, exception should be thrown. when(store.getLatestSuperSetValueSchemaId()).thenReturn(2); Assert.assertThrows(InvalidVeniceSchemaException.class, () -> schemaRepository.getSupersetSchema(storeName)); - verify(schemaRepository, times(7)).forceRefreshSchemaData(any(), any()); + verify(schemaRepository, times(14)).forceRefreshSchemaData(any(), any()); when(store.getLatestSuperSetValueSchemaId()).thenReturn(SchemaData.INVALID_VALUE_SCHEMA_ID); Assert.assertNull(schemaRepository.getSupersetSchema(storeName));