Skip to content

Commit

Permalink
[server] Extend retry for new superset schema fetching. (#897)
Browse files Browse the repository at this point in the history
Previous when user updates a new value schema to the AAWC store, the ongoing ingestion will get acknowledged by ZK store to refresh the schema. Previous retry for fetching the new schema from zk is 3 times with a fixed 100ms, which is very short. Without fetching new schema, the store ingestion could fail. For a real-prod system, usually server need to wait for 3~5 seconds to fetch this schema after new schema registered. Here we use exponential back off retry.

Co-authored-by: Hao Xu <[email protected]>
  • Loading branch information
haoxu07 and Hao Xu authored Mar 14, 2024
1 parent c769e89 commit eed157b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ SchemaEntry forceRefreshSupersetSchemaWithRetry(String storeName) {
Store store = getStoreRepository().getStore(storeName);
int supersetSchemaId = store.getLatestSuperSetValueSchemaId();
AtomicReference<SchemaEntry> supersetSchemaEntry = new AtomicReference<>();
RetryUtils.executeWithMaxAttempt(() -> {
long currentTimestamp = System.currentTimeMillis();
List<Class<? extends Throwable>> retriableExceptions =
Collections.singletonList(InvalidVeniceSchemaException.class);
RetryUtils.executeWithMaxAttemptAndExponentialBackoff(() -> {
try {
getSchemaLock().writeLock().lock();
SchemaData schemaData = getSchemaMap().get(storeName);
Expand All @@ -176,7 +179,13 @@ SchemaEntry forceRefreshSupersetSchemaWithRetry(String storeName) {
} finally {
getSchemaLock().writeLock().unlock();
}
}, 3, Duration.ofMillis(100), Collections.singletonList(InvalidVeniceSchemaException.class));
}, 10, Duration.ofSeconds(1), Duration.ofMinutes(1), Duration.ofMinutes(5), retriableExceptions);
long timePassed = System.currentTimeMillis() - currentTimestamp;
logger.info(
"Obtain superset schema id: {} for store {} with time in milliseconds: {}.",
supersetSchemaId,
storeName,
timePassed);
return supersetSchemaEntry.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testForceRefreshSchemaData() {
// 3 times force refresh still won't get the schema, exception should be thrown.
when(store.getLatestSuperSetValueSchemaId()).thenReturn(2);
Assert.assertThrows(InvalidVeniceSchemaException.class, () -> schemaRepository.getSupersetSchema(storeName));
verify(schemaRepository, times(7)).forceRefreshSchemaData(any(), any());
verify(schemaRepository, times(14)).forceRefreshSchemaData(any(), any());

when(store.getLatestSuperSetValueSchemaId()).thenReturn(SchemaData.INVALID_VALUE_SCHEMA_ID);
Assert.assertNull(schemaRepository.getSupersetSchema(storeName));
Expand Down

0 comments on commit eed157b

Please sign in to comment.