Skip to content

Commit 14d05d4

Browse files
authored
[server][da-vinci-client] Retry non-blocking metadata fetch (#1232)
In recent past we have seen cases where end offset fetch call stalls for a very long time. It could happen due to multiple reasons, like general Kafka outage, or mismatched VT vs RT partition count leading to non-existed partition or other network issues. Even though we cache the offset value, there is a blocking call before cache is updated. This hold the lock for a very long time and subsequent calls (either from metics collection) or the ready to serve call waits forever. Since drainer thread is shared this blocks the processing of other resources in a drainer thread leading to cluster-wide impact. This PR check for cache miss and returns immediately with some sentinel value while a nonblocking call updates the cache asynchronously.
1 parent 0cd9b9e commit 14d05d4

File tree

6 files changed

+55
-4
lines changed

6 files changed

+55
-4
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import com.linkedin.venice.utils.LatencyUtils;
104104
import com.linkedin.venice.utils.PartitionUtils;
105105
import com.linkedin.venice.utils.RedundantExceptionFilter;
106+
import com.linkedin.venice.utils.RetryUtils;
106107
import com.linkedin.venice.utils.SparseConcurrentList;
107108
import com.linkedin.venice.utils.Time;
108109
import com.linkedin.venice.utils.Timer;
@@ -2237,7 +2238,18 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop
22372238
if (offsetFromConsumer >= 0) {
22382239
return offsetFromConsumer;
22392240
}
2240-
return getTopicManager(kafkaUrl).getLatestOffsetCached(pubSubTopic, partition);
2241+
try {
2242+
return RetryUtils.executeWithMaxAttempt(() -> {
2243+
long offset = getTopicManager(kafkaUrl).getLatestOffsetCachedNonBlocking(pubSubTopic, partition);
2244+
if (offset == -1) {
2245+
throw new VeniceException("Found latest offset -1");
2246+
}
2247+
return offset;
2248+
}, 5, Duration.ofSeconds(1), Collections.singletonList(VeniceException.class));
2249+
} catch (Exception e) {
2250+
LOGGER.error("Could not find latest offset for {} even after 5 retries", pubSubTopic.getName());
2251+
return -1;
2252+
}
22412253
}
22422254

22432255
protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition) {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/ThinClientMetaStoreBasedRepository.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.linkedin.venice.utils.VeniceProperties;
2424
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
2525
import java.time.Duration;
26-
import java.util.Arrays;
26+
import java.util.Collections;
2727
import java.util.HashMap;
2828
import java.util.Map;
2929
import java.util.concurrent.Callable;
@@ -91,15 +91,15 @@ protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) {
9191
icProvider == null ? supplier : () -> icProvider.call(getClass().getCanonicalName(), supplier);
9292
StoreMetaValue value = RetryUtils.executeWithMaxAttempt(() -> {
9393
try {
94-
return wrappedSupplier.call().get(1, TimeUnit.SECONDS);
94+
return wrappedSupplier.call().get(5, TimeUnit.SECONDS);
9595
} catch (ServiceDiscoveryException e) {
9696
throw e;
9797
} catch (Exception e) {
9898
throw new VeniceRetriableException(
9999
"Failed to get data from meta store using thin client for store: " + storeName + " with key: " + key,
100100
e);
101101
}
102-
}, 5, Duration.ofSeconds(1), Arrays.asList(VeniceRetriableException.class));
102+
}, 10, Duration.ofSeconds(1), Collections.singletonList(VeniceRetriableException.class));
103103

104104
if (value == null) {
105105
throw new MissingKeyInStoreMetadataException(key.toString(), StoreMetaValue.class.getSimpleName());

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3017,6 +3017,9 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica
30173017
doReturn(5L).when(mockTopicManager).getLatestOffsetCached(any(), anyInt());
30183018
doReturn(150L).when(mockTopicManagerRemoteKafka).getLatestOffsetCached(any(), anyInt());
30193019
doReturn(150L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any());
3020+
long endOffset =
3021+
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 1);
3022+
assertEquals(endOffset, 150L);
30203023
if (nodeType == NodeType.LEADER) {
30213024
// case 6a: leader replica => partition is not ready to serve
30223025
doReturn(LeaderFollowerStateType.LEADER).when(mockPcsBufferReplayStartedRemoteLagging).getLeaderFollowerState();
@@ -3176,6 +3179,15 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
31763179
} else {
31773180
assertTrue(storeIngestionTaskUnderTest.isReadyToServe(mockPcsMultipleSourceKafkaServers));
31783181
}
3182+
doReturn(10L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any());
3183+
long endOffset =
3184+
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0);
3185+
assertEquals(endOffset, 10L);
3186+
doReturn(-1L).when(aggKafkaConsumerService).getLatestOffsetBasedOnMetrics(anyString(), any(), any());
3187+
endOffset =
3188+
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0);
3189+
assertEquals(endOffset, 0L);
3190+
31793191
}
31803192

31813193
@DataProvider

internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,11 @@ public long getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId) {
731731
return topicMetadataFetcher.getLatestOffsetCached(new PubSubTopicPartitionImpl(pubSubTopic, partitionId));
732732
}
733733

734+
public long getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId) {
735+
return topicMetadataFetcher
736+
.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, partitionId));
737+
}
738+
734739
public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries) {
735740
return topicMetadataFetcher.getProducerTimestampOfLastDataMessageWithRetries(pubSubTopicPartition, retries);
736741
}

internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,25 @@ CompletableFuture<Long> getLatestOffsetWithRetriesAsync(PubSubTopicPartition pub
379379
.supplyAsync(() -> getLatestOffsetWithRetries(pubSubTopicPartition, retries), threadPoolExecutor);
380380
}
381381

382+
long getLatestOffsetCachedNonBlocking(PubSubTopicPartition pubSubTopicPartition) {
383+
ValueAndExpiryTime<Long> cachedValue;
384+
cachedValue = latestOffsetCache.get(pubSubTopicPartition);
385+
updateCacheAsync(
386+
pubSubTopicPartition,
387+
cachedValue,
388+
latestOffsetCache,
389+
() -> getLatestOffsetWithRetriesAsync(
390+
pubSubTopicPartition,
391+
DEFAULT_MAX_RETRIES_FOR_POPULATING_TMD_CACHE_ENTRY));
392+
if (cachedValue == null) {
393+
cachedValue = latestOffsetCache.get(pubSubTopicPartition);
394+
if (cachedValue == null) {
395+
return -1;
396+
}
397+
}
398+
return cachedValue.getValue();
399+
}
400+
382401
long getLatestOffsetCached(PubSubTopicPartition pubSubTopicPartition) {
383402
ValueAndExpiryTime<Long> cachedValue;
384403
try {

internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,9 @@ public void testGetTopicLatestOffsets() {
260260
assertEquals(res.size(), offsetsMap.size());
261261
assertEquals(res.get(0), 111L);
262262
assertEquals(res.get(1), 222L);
263+
assertEquals(
264+
topicMetadataFetcher.getLatestOffsetCachedNonBlocking(new PubSubTopicPartitionImpl(pubSubTopic, 0)),
265+
-1);
263266

264267
verify(consumerMock, times(3)).partitionsFor(pubSubTopic);
265268
verify(consumerMock, times(1)).endOffsets(eq(offsetsMap.keySet()), any(Duration.class));

0 commit comments

Comments
 (0)