diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 6757f4d61b1..bb9dcfa7e3a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -83,6 +83,8 @@ import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_IDLE_TIME_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_NETTY_WORKER_THREADS; import static com.linkedin.venice.ConfigKeys.SERVER_NODE_CAPACITY_RCU; +import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND; +import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND; import static com.linkedin.venice.ConfigKeys.SERVER_NUM_SCHEMA_FAST_CLASS_WARMUP; import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_OPTIMIZE_DATABASE_FOR_BACKUP_VERSION_NO_READ_THRESHOLD_SECONDS; @@ -110,6 +112,9 @@ import static com.linkedin.venice.ConfigKeys.SERVER_SSL_HANDSHAKE_THREAD_POOL_SIZE; import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_STORE_TO_EARLY_TERMINATION_THRESHOLD_MS_MAP; +import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND; +import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND; import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED; @@ -445,6 +450,12 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final long ingestionHeartbeatIntervalMs; + private final boolean stuckConsumerRepairEnabled; + private final int stuckConsumerRepairIntervalSecond; + private final int stuckConsumerDetectionRepairThresholdSecond; + private final int nonExistingTopicIngestionTaskKillThresholdSecond; + private final int nonExistingTopicCheckRetryIntervalSecond; + public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException { this(serverProperties, Collections.emptyMap()); } @@ -731,6 +742,21 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map kafkaClusterUrlResolver; + private final Map versionTopicStoreIngestionTaskMapping = new VeniceConcurrentHashMap<>(); + private ScheduledExecutorService stuckConsumerRepairExecutorService; + public AggKafkaConsumerService( final PubSubConsumerAdapterFactory consumerFactory, TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier, @@ -65,7 +78,8 @@ public AggKafkaConsumerService( KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, final MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, - final PubSubMessageDeserializer pubSubDeserializer) { + final PubSubMessageDeserializer pubSubDeserializer, + Consumer killIngestionTaskRunnable) { this.consumerFactory = consumerFactory; this.readCycleDelayMs = serverConfig.getKafkaReadCycleDelayMs(); this.numOfConsumersPerKafkaCluster = serverConfig.getConsumerPoolSizePerKafkaCluster(); @@ -83,6 +97,26 @@ public AggKafkaConsumerService( this.pubSubDeserializer = pubSubDeserializer; this.sslPropertiesSupplier = sslPropertiesSupplier; this.kafkaClusterUrlResolver = serverConfig.getKafkaClusterUrlResolver(); + + if (serverConfig.isStuckConsumerRepairEnabled()) { + this.stuckConsumerRepairExecutorService = Executors.newSingleThreadScheduledExecutor( + new DaemonThreadFactory(this.getClass().getName() + "-StuckConsumerRepair")); + int intervalInSeconds = serverConfig.getStuckConsumerRepairIntervalSecond(); + this.stuckConsumerRepairExecutorService.scheduleAtFixedRate( + getStuckConsumerDetectionAndRepairRunnable( + kafkaServerToConsumerServiceMap, + versionTopicStoreIngestionTaskMapping, + TimeUnit.SECONDS.toMillis(serverConfig.getStuckConsumerDetectionRepairThresholdSecond()), + TimeUnit.SECONDS.toMillis(serverConfig.getNonExistingTopicIngestionTaskKillThresholdSecond()), + TimeUnit.SECONDS.toMillis(serverConfig.getNonExistingTopicCheckRetryIntervalSecond()), + new StuckConsumerRepairStats(metricsRepository), + killIngestionTaskRunnable), + intervalInSeconds, + intervalInSeconds, + TimeUnit.SECONDS); + LOGGER.info("Started stuck consumer repair service with checking interval: {} seconds", intervalInSeconds); + } + LOGGER.info("Successfully initialized AggKafkaConsumerService"); } @@ -100,6 +134,106 @@ public void stopInner() throws Exception { for (KafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) { consumerService.stop(); } + if (this.stuckConsumerRepairExecutorService != null) { + this.stuckConsumerRepairExecutorService.shutdownNow(); + } + } + + protected static Runnable getStuckConsumerDetectionAndRepairRunnable( + Map kafkaServerToConsumerServiceMap, + Map versionTopicStoreIngestionTaskMapping, + long stuckConsumerRepairThresholdMs, + long nonExistingTopicIngestionTaskKillThresholdMs, + long nonExistingTopicRetryIntervalMs, + StuckConsumerRepairStats stuckConsumerRepairStats, + Consumer killIngestionTaskRunnable) { + return () -> { + /** + * The following logic can be further optimized in the following way: + * 1. If the max delay of previous run is much smaller than the threshold. + * 2. In the next run, the max possible delay will be schedule interval + previous max delay ms, and if it is + * still below the threshold, this function can return directly. + * We are not adopting such optimization right now because: + * 1. Extra state to maintain. + * 2. The schedule interval is supposed to be high. + * 3. The check is cheap when there is no stuck consumer. + */ + boolean scanStoreIngestionTaskToFixStuckConsumer = false; + for (KafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) { + long maxDelayMs = consumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool(); + if (maxDelayMs >= stuckConsumerRepairThresholdMs) { + scanStoreIngestionTaskToFixStuckConsumer = true; + LOGGER.warn("Found some consumer has stuck for {} ms, will start the repairing procedure", maxDelayMs); + break; + } + } + if (!scanStoreIngestionTaskToFixStuckConsumer) { + return; + } + stuckConsumerRepairStats.recordStuckConsumerFound(); + + /** + * Collect a list of SITs, whose version topic doesn't exist by checking {@link StoreIngestionTask#isProducingVersionTopicHealthy()}, + * and this function will continue to check the version topic healthiness for a period of {@link nonExistingTopicIngestionTaskKillThresholdMs} + * to tolerate transient topic discovery issue. + */ + Map versionTopicIngestionTaskMappingForNonExistingTopic = new HashMap<>(); + versionTopicStoreIngestionTaskMapping.forEach((vt, sit) -> { + try { + if (!sit.isProducingVersionTopicHealthy()) { + versionTopicIngestionTaskMappingForNonExistingTopic.put(vt, sit); + LOGGER.warn("The producing version topic:{} is not healthy", vt); + } + } catch (Exception e) { + LOGGER.error("Got exception while checking topic existence for version topic: {}", vt, e); + } + }); + int maxAttempts = + (int) Math.ceil((double) nonExistingTopicIngestionTaskKillThresholdMs / nonExistingTopicRetryIntervalMs); + for (int cnt = 0; cnt < maxAttempts; ++cnt) { + Iterator> iterator = + versionTopicIngestionTaskMappingForNonExistingTopic.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String versionTopic = entry.getKey(); + StoreIngestionTask sit = entry.getValue(); + try { + if (sit.isProducingVersionTopicHealthy()) { + /** + * If the version topic becomes available after retries, remove it from the tracking map. + */ + iterator.remove(); + LOGGER.info("The producing version topic:{} becomes healthy", versionTopic); + } + } catch (Exception e) { + LOGGER.error("Got exception while checking topic existence for version topic: {}", versionTopic, e); + } finally { + Utils.sleep(nonExistingTopicRetryIntervalMs); + } + } + } + + AtomicBoolean repairSomeIngestionTask = new AtomicBoolean(false); + versionTopicIngestionTaskMappingForNonExistingTopic.forEach((vt, sit) -> { + LOGGER.warn( + "The ingestion topics (version topic) are not healthy for " + + "store version: {}, will kill the ingestion task to try to unblock shared consumer", + vt); + /** + * The following function call will interrupt all the stuck {@link org.apache.kafka.clients.producer.KafkaProducer#send} call + * to non-existing topics. + */ + sit.closeVeniceWriters(false); + killIngestionTaskRunnable.accept(vt); + repairSomeIngestionTask.set(true); + stuckConsumerRepairStats.recordIngestionTaskRepair(); + }); + if (!repairSomeIngestionTask.get()) { + LOGGER.error( + "Didn't find any suspicious ingestion task, and please contact developers to investigate it further"); + stuckConsumerRepairStats.recordRepairFailure(); + } + }; } /** @@ -235,6 +369,8 @@ public ConsumedDataReceiver consumerService.unsubscribeAll(versionTopic)); + versionTopicStoreIngestionTaskMapping.remove(versionTopic.getName()); } void pauseConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 663e9b99372..4e62ac6c08b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -116,7 +116,7 @@ protected KafkaConsumerService( : createKafkaConsumerServiceStats( metricsRepository, kafkaClusterAlias, - this::getMaxElapsedTimeSinceLastPollInConsumerPool); + this::getMaxElapsedTimeMSSinceLastPollInConsumerPool); for (int i = 0; i < numOfConsumersPerKafkaCluster; ++i) { /** * We need to assign a unique client id across all the storage nodes, otherwise, they will fail into the same throttling bucket. @@ -324,7 +324,7 @@ private KafkaConsumerServiceStats createKafkaConsumerServiceStats( getMaxElapsedTimeSinceLastPollInConsumerPool); } - private long getMaxElapsedTimeSinceLastPollInConsumerPool() { + public long getMaxElapsedTimeMSSinceLastPollInConsumerPool() { long maxElapsedTimeSinceLastPollInConsumerPool = -1; int slowestTaskId = -1; long elapsedTimeSinceLastPoll; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index d9762137e94..4e4dd282c3d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -448,7 +448,8 @@ public void handleStoreDeleted(Store store) { kafkaClusterBasedRecordThrottler, metricsRepository, new MetadataRepoBasedTopicExistingCheckerImpl(this.getMetadataRepo()), - pubSubDeserializer); + pubSubDeserializer, + (topicName) -> this.killConsumptionTask(topicName)); /** * After initializing a {@link AggKafkaConsumerService} service, it doesn't contain KafkaConsumerService yet until * a new Kafka cluster is registered; here we explicitly create KafkaConsumerService for the local Kafka cluster. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index cf1071d517f..145bd5d3d08 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -282,7 +282,7 @@ public LeaderFollowerStoreIngestionTask( } @Override - protected void closeVeniceWriters(boolean doFlush) { + public void closeVeniceWriters(boolean doFlush) { if (veniceWriter.isPresent()) { veniceWriter.get().close(doFlush); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e72e6065f2b..25df0e06c79 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1390,6 +1390,13 @@ public void run() { LOGGER.info("{} has been killed.", consumerTaskId); statusReportAdapter.reportKilled(partitionConsumptionStateMap.values(), e); doFlush = false; + if (isCurrentVersion.getAsBoolean()) { + /** + * Current version can be killed if {@link AggKafkaConsumerService} discovers there are some issues with + * the producing topics, and here will report metrics for such case. + */ + handleIngestionException(e); + } } catch (VeniceChecksumException e) { /** * It's possible to receive checksum verification failure exception here from the above syncOffset() call. @@ -1521,7 +1528,7 @@ private void internalClose(boolean doFlush) { LOGGER.info("Store ingestion task for store: {} is closed", kafkaVersionTopic); } - protected void closeVeniceWriters(boolean doFlush) { + public void closeVeniceWriters(boolean doFlush) { } protected void closeVeniceViewWriters() { @@ -3387,6 +3394,10 @@ public PubSubTopic getVersionTopic() { return versionTopic; } + public PubSubTopic getRealtimeTopic() { + return realTimeTopic; + } + public boolean isMetricsEmissionEnabled() { return emitMetrics.get(); } @@ -3770,4 +3781,21 @@ protected boolean shouldUpdateUpstreamOffset(PubSubMessage getMaxElapsedTimeSinceLastPollInConsumerPool.getAsLong())); // consumer record number per second returned by Kafka consumer poll. pollResultNumSensor = registerSensor("consumer_poll_result_num", new Avg(), new Total()); pollNonZeroResultNumSensor = registerSensor("consumer_poll_non_zero_result_num", new Avg(), new Total()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StuckConsumerRepairStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StuckConsumerRepairStats.java new file mode 100644 index 00000000000..189f94b7498 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StuckConsumerRepairStats.java @@ -0,0 +1,33 @@ +package com.linkedin.davinci.stats; + +import com.linkedin.venice.stats.AbstractVeniceStats; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import io.tehuti.metrics.stats.OccurrenceRate; + + +public class StuckConsumerRepairStats extends AbstractVeniceStats { + private Sensor stuckConsumerFound; + private Sensor ingestionTaskRepair; + private Sensor repairFailure; + + public StuckConsumerRepairStats(MetricsRepository metricsRepository) { + super(metricsRepository, "StuckConsumerRepair"); + + this.stuckConsumerFound = registerSensor("stuck_consumer_found", new OccurrenceRate()); + this.ingestionTaskRepair = registerSensor("ingestion_task_repair", new OccurrenceRate()); + this.repairFailure = registerSensor("repair_failure", new OccurrenceRate()); + } + + public void recordStuckConsumerFound() { + stuckConsumerFound.record(); + } + + public void recordIngestionTaskRepair() { + ingestionTaskRepair.record(); + } + + public void recordRepairFailure() { + repairFailure.record(); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java new file mode 100644 index 00000000000..eae4169fff5 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java @@ -0,0 +1,124 @@ +package com.linkedin.davinci.kafka.consumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.davinci.stats.StuckConsumerRepairStats; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import org.testng.annotations.Test; + + +public class AggKafkaConsumerServiceTest { + @Test + public void testGetStuckConsumerDetectionAndRepairRunnable() { + Map kafkaServerToConsumerServiceMap = new HashMap<>(); + Map versionTopicStoreIngestionTaskMapping = new HashMap<>(); + long stuckConsumerRepairThresholdMs = 100; + long nonExistingTopicIngestionTaskKillThresholdMs = 1000; + StuckConsumerRepairStats stuckConsumerRepairStats = mock(StuckConsumerRepairStats.class); + + // Everything is good + KafkaConsumerService goodConsumerService = mock(KafkaConsumerService.class); + when(goodConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool()).thenReturn(10l); + kafkaServerToConsumerServiceMap.put("good", goodConsumerService); + StoreIngestionTask goodTask = mock(StoreIngestionTask.class); + when(goodTask.isProducingVersionTopicHealthy()).thenReturn(true); + versionTopicStoreIngestionTaskMapping.put("good_task", goodTask); + + Consumer killIngestionTaskRunnable = mock(Consumer.class); + + Runnable repairRunnable = AggKafkaConsumerService.getStuckConsumerDetectionAndRepairRunnable( + kafkaServerToConsumerServiceMap, + versionTopicStoreIngestionTaskMapping, + stuckConsumerRepairThresholdMs, + nonExistingTopicIngestionTaskKillThresholdMs, + 200, + stuckConsumerRepairStats, + killIngestionTaskRunnable); + repairRunnable.run(); + verify(goodConsumerService).getMaxElapsedTimeMSSinceLastPollInConsumerPool(); + verify(stuckConsumerRepairStats, never()).recordStuckConsumerFound(); + verify(stuckConsumerRepairStats, never()).recordIngestionTaskRepair(); + verify(stuckConsumerRepairStats, never()).recordRepairFailure(); + verify(killIngestionTaskRunnable, never()).accept(any()); + } + + @Test + public void testGetStuckConsumerDetectionAndRepairRunnableForTransientNonExistingTopic() { + Map kafkaServerToConsumerServiceMap = new HashMap<>(); + Map versionTopicStoreIngestionTaskMapping = new HashMap<>(); + long stuckConsumerRepairThresholdMs = 100; + long nonExistingTopicIngestionTaskKillThresholdMs = 1000; + StuckConsumerRepairStats stuckConsumerRepairStats = mock(StuckConsumerRepairStats.class); + + Consumer killIngestionTaskRunnable = mock(Consumer.class); + + Runnable repairRunnable = AggKafkaConsumerService.getStuckConsumerDetectionAndRepairRunnable( + kafkaServerToConsumerServiceMap, + versionTopicStoreIngestionTaskMapping, + stuckConsumerRepairThresholdMs, + nonExistingTopicIngestionTaskKillThresholdMs, + 200, + stuckConsumerRepairStats, + killIngestionTaskRunnable); + // One stuck consumer + KafkaConsumerService badConsumerService = mock(KafkaConsumerService.class); + when(badConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool()).thenReturn(1000l); + kafkaServerToConsumerServiceMap.put("bad", badConsumerService); + + StoreIngestionTask transientBadTask = mock(StoreIngestionTask.class); + when(transientBadTask.isProducingVersionTopicHealthy()).thenReturn(false).thenReturn(true); + versionTopicStoreIngestionTaskMapping.put("transient_bad_task", transientBadTask); + repairRunnable.run(); + verify(badConsumerService).getMaxElapsedTimeMSSinceLastPollInConsumerPool(); + verify(stuckConsumerRepairStats).recordStuckConsumerFound(); + verify(stuckConsumerRepairStats, never()).recordIngestionTaskRepair(); + verify(stuckConsumerRepairStats).recordRepairFailure(); + verify(killIngestionTaskRunnable, never()).accept(any()); + } + + @Test + public void testGetStuckConsumerDetectionAndRepairRunnableForNonExistingTopic() { + Map kafkaServerToConsumerServiceMap = new HashMap<>(); + Map versionTopicStoreIngestionTaskMapping = new HashMap<>(); + long stuckConsumerRepairThresholdMs = 100; + long nonExistingTopicIngestionTaskKillThresholdMs = 1000; + StuckConsumerRepairStats stuckConsumerRepairStats = mock(StuckConsumerRepairStats.class); + + Consumer killIngestionTaskRunnable = mock(Consumer.class); + + Runnable repairRunnable = AggKafkaConsumerService.getStuckConsumerDetectionAndRepairRunnable( + kafkaServerToConsumerServiceMap, + versionTopicStoreIngestionTaskMapping, + stuckConsumerRepairThresholdMs, + nonExistingTopicIngestionTaskKillThresholdMs, + 200, + stuckConsumerRepairStats, + killIngestionTaskRunnable); + // One stuck consumer + KafkaConsumerService badConsumerService = mock(KafkaConsumerService.class); + when(badConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool()).thenReturn(1000l); + kafkaServerToConsumerServiceMap.put("bad", badConsumerService); + StoreIngestionTask badTask = mock(StoreIngestionTask.class); + when(badTask.isProducingVersionTopicHealthy()).thenReturn(false); + versionTopicStoreIngestionTaskMapping.put("bad_task", badTask); + repairRunnable.run(); + verify(badConsumerService).getMaxElapsedTimeMSSinceLastPollInConsumerPool(); + verify(badTask, times(6)).isProducingVersionTopicHealthy(); + verify(badTask).closeVeniceWriters(false); + verify(killIngestionTaskRunnable).accept("bad_task"); + verify(stuckConsumerRepairStats).recordStuckConsumerFound(); + verify(stuckConsumerRepairStats).recordIngestionTaskRepair(); + + // One stuck consumer without any problematic ingestion task + versionTopicStoreIngestionTaskMapping.remove("bad_task"); + repairRunnable.run(); + verify(stuckConsumerRepairStats).recordRepairFailure(); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 8705cf340aa..9761c05ea9d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -3550,7 +3550,6 @@ public void testMaybeSendIngestionHeartbeat(boolean isActiveActive) { doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any()); StoreIngestionTaskFactory ingestionTaskFactory = TestUtils.getStoreIngestionTaskBuilder(storeName) - .setTopicManagerRepository(mockTopicManagerRepository) .setStorageMetadataService(mockStorageMetadataService) .setMetadataRepository(mockReadOnlyStoreRepository) .setTopicManagerRepository(mockTopicManagerRepository) @@ -3573,6 +3572,15 @@ public void testMaybeSendIngestionHeartbeat(boolean isActiveActive) { // Second invocation should be skipped since it shouldn't be time for another heartbeat yet. ingestionTask.maybeSendIngestionHeartbeat(); verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any()); + + /** + * Leverage the same test to validate {@link StoreIngestionTask#isProducingVersionTopicHealthy()} + */ + when(mockTopicManager.containsTopic(eq(ingestionTask.getVersionTopic()))).thenReturn(true); + Assert.assertTrue(ingestionTask.isProducingVersionTopicHealthy()); + + when(mockTopicManager.containsTopic(eq(ingestionTask.getVersionTopic()))).thenReturn(false); + Assert.assertFalse(ingestionTask.isProducingVersionTopicHealthy()); } private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig( diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/BasicClientStats.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/BasicClientStats.java index 826e6e3e8e3..527000aa2c5 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/BasicClientStats.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/BasicClientStats.java @@ -16,6 +16,10 @@ * This class offers very basic metrics for client, and right now, it is directly used by DaVinci. */ public class BasicClientStats extends AbstractVeniceHttpStats { + private static final String SYSTEM_STORE_NAME_PREFIX = "venice_system_store_"; + + private static final MetricsRepository dummySystemStoreMetricRepo = new MetricsRepository(); + private final Sensor requestSensor; private final Sensor healthySensor; private final Sensor unhealthySensor; @@ -38,7 +42,10 @@ public static BasicClientStats getClientStats( } protected BasicClientStats(MetricsRepository metricsRepository, String storeName, RequestType requestType) { - super(metricsRepository, storeName, requestType); + super( + storeName.startsWith(SYSTEM_STORE_NAME_PREFIX) ? dummySystemStoreMetricRepo : metricsRepository, + storeName, + requestType); requestSensor = registerSensor("request", requestRate); Rate healthyRequestRate = new OccurrenceRate(); healthySensor = registerSensor("healthy_request", healthyRequestRate); diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java index 0d709bc7ef3..9e09ffb4e1f 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java @@ -7,13 +7,10 @@ public abstract class AbstractVeniceHttpStats extends AbstractVeniceStats { - private static final String SYSTEM_STORE_NAME_PREFIX = "venice_system_store_"; - private static final MetricsRepository dummySystemStoreMetricRepo = new MetricsRepository(); - private final RequestType requestType; public AbstractVeniceHttpStats(MetricsRepository metricsRepository, String storeName, RequestType requestType) { - super(storeName.startsWith(SYSTEM_STORE_NAME_PREFIX) ? dummySystemStoreMetricRepo : metricsRepository, storeName); + super(metricsRepository, storeName); this.requestType = requestType; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 1a0c205b114..8c5eca94a8d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2015,4 +2015,32 @@ private ConfigKeys() { * with SOS, EOS or skipped records. */ public static final String SERVER_INGESTION_HEARTBEAT_INTERVAL_MS = "server.ingestion.heartbeat.interval.ms"; + + /** + * Whether to enable stuck consumer repair in Server. + */ + public static final String SERVER_STUCK_CONSUMER_REPAIR_ENABLED = "server.stuck.consumer.repair.enabled"; + + /** + * Server stuck consumer detection interval. + */ + public static final String SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND = "server.stuck.consumer.repair.second"; + + /** + * Server stuck consumer repair threshold. + */ + public static final String SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND = + "server.stuck.consumer.repair.threshold.second"; + + /** + * When to kill the ingestion task if the topic doesn't exist for the configured period of time. + */ + public static final String SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND = + "server.non.existing.topic.ingestion.task.kill.threshold.second"; + /** + * The config will work together with {@link #SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND} + * to decide whether a certain ingestion task should be killed or not. + */ + public static final String SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND = + "server.non.existing.topic.check.retry.interval.second"; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java new file mode 100644 index 00000000000..d98e69be70f --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java @@ -0,0 +1,248 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; +import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS; +import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; +import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER; +import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; +import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND; +import static com.linkedin.venice.ConfigKeys.SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND; +import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY; +import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND; +import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND; +import static com.linkedin.venice.ConfigKeys.SSL_TO_KAFKA_LEGACY; +import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.runVPJ; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendCustomSizeStreamingRecord; +import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import com.github.benmanes.caffeine.cache.Cache; +import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; +import com.linkedin.venice.client.store.AvroGenericStoreClient; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.ClientFactory; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.kafka.TopicManager; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Utils; +import io.tehuti.metrics.MetricsRepository; +import java.io.File; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.samza.system.SystemProducer; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestStuckConsumerRepair { + private static final Logger LOGGER = LogManager.getLogger(TestStuckConsumerRepair.class); + public static final int STREAMING_RECORD_SIZE = 1024; + + private VeniceClusterWrapper sharedVenice; + + @BeforeClass(alwaysRun = true) + public void setUp() { + sharedVenice = setUpCluster(); + } + + private static VeniceClusterWrapper setUpCluster() { + Properties extraProperties = new Properties(); + extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "5"); + VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 1, 2, 1000000, false, false, extraProperties); + + // Add Venice Router + Properties routerProperties = new Properties(); + cluster.addVeniceRouter(routerProperties); + + // Add Venice Server + Properties serverProperties = new Properties(); + serverProperties.setProperty(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB.name()); + serverProperties.setProperty(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); + serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false"); + serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "true"); + serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300"); + + serverProperties.setProperty(SSL_TO_KAFKA_LEGACY, "false"); + serverProperties.setProperty(SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER, "3"); + serverProperties.setProperty(SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED, "true"); + serverProperties.setProperty(SERVER_STUCK_CONSUMER_REPAIR_ENABLED, "true"); + serverProperties.setProperty(SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND, "1"); + serverProperties.setProperty(SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND, "2"); + serverProperties.setProperty(SERVER_NON_EXISTING_TOPIC_INGESTION_TASK_KILL_THRESHOLD_SECOND, "5"); + serverProperties.setProperty(SERVER_NON_EXISTING_TOPIC_CHECK_RETRY_INTERNAL_SECOND, "1"); + + serverProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "4"); + serverProperties.setProperty( + SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY, + KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name()); + cluster.addVeniceServer(new Properties(), serverProperties); + cluster.addVeniceServer(new Properties(), serverProperties); + + return cluster; + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(sharedVenice); + } + + private void checkLargeRecord(AvroGenericStoreClient client, int index) + throws ExecutionException, InterruptedException { + String key = Integer.toString(index); + String value = client.get(key).get().toString(); + assertEquals( + value.length(), + STREAMING_RECORD_SIZE, + "Expected a large record for key '" + key + "' but instead got: '" + value + "'."); + + String expectedChar = Integer.toString(index).substring(0, 1); + for (int i = 0; i < value.length(); i++) { + assertEquals(value.substring(i, i + 1), expectedChar); + } + } + + @Test(timeOut = 120 * 000) + public void testStuckConsumerRepair() throws Exception { + SystemProducer veniceProducer = null; + + VeniceClusterWrapper venice = sharedVenice; + try { + long streamingMessageLag = 2L; + + String storeName = Utils.getUniqueString("hybrid-store"); + File inputDir = getTempDataDirectory(); + String inputDirPath = "file://" + inputDir.getAbsolutePath(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); // records 1-100 + Properties vpjProperties = defaultVPJProps(venice, inputDirPath, storeName); + + try (ControllerClient controllerClient = createStoreForJob(venice.getClusterName(), recordSchema, vpjProperties); + AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(venice.getRandomRouterURL())); + TopicManager topicManager = + IntegrationTestPushUtils + .getTopicManagerRepo( + DEFAULT_KAFKA_OPERATION_TIMEOUT_MS, + 100, + 0l, + venice.getPubSubBrokerWrapper(), + sharedVenice.getPubSubTopicRepository()) + .getTopicManager()) { + + Cache cacheNothingCache = Mockito.mock(Cache.class); + Mockito.when(cacheNothingCache.getIfPresent(Mockito.any())).thenReturn(null); + topicManager.setTopicConfigCache(cacheNothingCache); + + ControllerResponse response = controllerClient.updateStore( + storeName, + new UpdateStoreQueryParams().setHybridRewindSeconds(120).setHybridOffsetLagThreshold(streamingMessageLag)); + + Assert.assertFalse(response.isError()); + + // Do a VPJ push + runVPJ(vpjProperties, 1, controllerClient); + + // Verify some records (note, records 1-100 have been pushed) + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + try { + for (int i = 1; i < 100; i++) { + String key = Integer.toString(i); + Object value = client.get(key).get(); + assertNotNull(value, "Key " + i + " should not be missing!"); + assertEquals(value.toString(), "test_name_" + key); + } + } catch (Exception e) { + throw new VeniceException(e); + } + }); + + // write streaming records + veniceProducer = getSamzaProducer(venice, storeName, Version.PushType.STREAM); + for (int i = 1; i <= 10; i++) { + // The batch values are small, but the streaming records are "big" (i.e.: not that big, but bigger than + // the server's max configured chunk size). In the scenario where chunking is disabled, the server's + // max chunk size is not altered, and thus this will be under threshold. + sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE); + } + + // Run one more VPJ + runVPJ(vpjProperties, 2, controllerClient); + + // Verify streaming record in second version + checkLargeRecord(client, 2); + assertEquals(client.get("19").get().toString(), "test_name_19"); + + for (int i = 10; i <= 20; i++) { + sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE); + } + TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> { + try { + checkLargeRecord(client, 19); + } catch (Exception e) { + throw new VeniceException(e); + } + }); + + // Delete v1 topic to simulate producer stuck issue + String topicForV1 = Version.composeKafkaTopic(storeName, 1); + topicManager.ensureTopicIsDeletedAndBlock(sharedVenice.getPubSubTopicRepository().getTopic(topicForV1)); + LOGGER.info("Topic: {} has been deleted", topicForV1); + Utils.sleep(10000); // 10 seconds to let Kafka client get the topic deletion signal + + // Start sending more streaming records + for (int i = 20; i <= 100; i++) { + sendCustomSizeStreamingRecord(veniceProducer, storeName, i, 1024); + } + // Verify that the stuck consumer repair logic does kick in + MetricsRepository serverRepo = venice.getVeniceServers().get(0).getMetricsRepository(); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + assertTrue( + serverRepo.metrics().get(".StuckConsumerRepair--stuck_consumer_found.OccurrenceRate").value() > 0f); + assertTrue( + serverRepo.metrics().get(".StuckConsumerRepair--ingestion_task_repair.OccurrenceRate").value() > 0f); + assertTrue(serverRepo.metrics().get(".StuckConsumerRepair--repair_failure.OccurrenceRate").value() == 0.0f); + }); + + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { + try { + for (int i = 20; i <= 100; ++i) { + checkLargeRecord(client, i); + } + } catch (Exception e) { + throw new VeniceException(e); + } + }); + } + } finally { + if (veniceProducer != null) { + veniceProducer.stop(); + } + } + } + +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java index 75b1c123dce..7042716a902 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java @@ -192,7 +192,8 @@ public void testLocalAndRemoteConsumption(boolean isTopicWiseSharedConsumerAssig kafkaClusterBasedRecordThrottler, metricsRepository, topicExistenceChecker, - pubSubDeserializer); + pubSubDeserializer, + (ignored) -> {}); versionTopic = getTopic(); int partition = 0; diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java index 5d55be3fe6d..43c71cae9f4 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java @@ -63,6 +63,8 @@ public class HttpChannelInitializer extends ChannelInitializer { AggServerQuotaTokenBucketStats quotaTokenBucketStats; List aclInterceptors; + private boolean isDaVinciClient; + public HttpChannelInitializer( ReadOnlyStoreRepository storeMetadataRepository, CompletableFuture customizedViewRepository, @@ -75,6 +77,7 @@ public HttpChannelInitializer( StorageReadRequestHandler requestHandler) { this.serverConfig = serverConfig; this.requestHandler = requestHandler; + this.isDaVinciClient = serverConfig.isDaVinciClient(); boolean isKeyValueProfilingEnabled = serverConfig.isKeyValueProfilingEnabled(); boolean isUnregisterMetricForDeletedStoreEnabled = serverConfig.isUnregisterMetricForDeletedStoreEnabled(); @@ -84,19 +87,22 @@ public HttpChannelInitializer( RequestType.SINGLE_GET, isKeyValueProfilingEnabled, storeMetadataRepository, - isUnregisterMetricForDeletedStoreEnabled); + isUnregisterMetricForDeletedStoreEnabled, + isDaVinciClient); this.multiGetStats = new AggServerHttpRequestStats( metricsRepository, RequestType.MULTI_GET, isKeyValueProfilingEnabled, storeMetadataRepository, - isUnregisterMetricForDeletedStoreEnabled); + isUnregisterMetricForDeletedStoreEnabled, + isDaVinciClient); this.computeStats = new AggServerHttpRequestStats( metricsRepository, RequestType.COMPUTE, isKeyValueProfilingEnabled, storeMetadataRepository, - isUnregisterMetricForDeletedStoreEnabled); + isUnregisterMetricForDeletedStoreEnabled, + isDaVinciClient); if (serverConfig.isComputeFastAvroEnabled()) { LOGGER.info("Fast avro for compute is enabled"); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java index c4349612c72..47d77dc0ebb 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java @@ -64,6 +64,8 @@ public class ListenerService extends AbstractVeniceService { private StorageReadRequestHandler storageReadRequestHandler; + private boolean isDaVinciClient; + public ListenerService( StorageEngineRepository storageEngineRepository, ReadOnlyStoreRepository storeMetadataRepository, diff --git a/services/venice-server/src/main/java/com/linkedin/venice/stats/AggServerHttpRequestStats.java b/services/venice-server/src/main/java/com/linkedin/venice/stats/AggServerHttpRequestStats.java index 042b85d2f49..81e3cd80f6f 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/stats/AggServerHttpRequestStats.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/stats/AggServerHttpRequestStats.java @@ -16,10 +16,11 @@ public AggServerHttpRequestStats( RequestType requestType, boolean isKeyValueProfilingEnabled, ReadOnlyStoreRepository metadataRepository, - boolean unregisterMetricForDeletedStoreEnabled) { + boolean unregisterMetricForDeletedStoreEnabled, + boolean isDaVinciClient) { super( metricsRepository, - new ServerHttpRequestStatsSupplier(requestType, isKeyValueProfilingEnabled), + new ServerHttpRequestStatsSupplier(requestType, isKeyValueProfilingEnabled, isDaVinciClient), metadataRepository, unregisterMetricForDeletedStoreEnabled); } @@ -28,9 +29,15 @@ static class ServerHttpRequestStatsSupplier implements StatsSupplier