diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java index 836676a4bf..e2fdcaec6a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java @@ -13,6 +13,7 @@ import com.linkedin.venice.utils.RedundantExceptionFilter; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricConfig; import io.tehuti.metrics.stats.Rate; import java.util.HashMap; @@ -66,6 +67,8 @@ class ConsumptionTask implements Runnable { private final Map messageRatePerTopicPartition = new VeniceConcurrentHashMap<>(); private final Map bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>(); private final Map pollRatePerTopicPartition = new VeniceConcurrentHashMap<>(); + + private final Lazy overallConsumerPollRate; private final RedundantExceptionFilter redundantExceptionFilter; private final Map lastSuccessfulPollTimestampPerTopicPartition = new VeniceConcurrentHashMap<>(); @@ -105,6 +108,7 @@ public ConsumptionTask( this.taskId = taskId; this.offsetLagGetter = offsetLagGetter; this.redundantExceptionFilter = redundantExceptionFilter; + this.overallConsumerPollRate = Lazy.of(() -> createRate(System.currentTimeMillis())); this.consumptionTaskIdStr = Utils.getSanitizedStringForLogger(consumerNamePrefix) + " - " + taskId; this.LOGGER = LogManager.getLogger(getClass().getSimpleName() + "[ " + consumptionTaskIdStr + " ]"); } @@ -196,6 +200,7 @@ public void run() { consumedDataReceiver.write(topicPartitionMessages); checkSlowPartitionWithHighLag(pubSubTopicPartition); } + overallConsumerPollRate.get().record(1, lastSuccessfulPollTimestamp); aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency( LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp)); aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount); @@ -302,15 +307,17 @@ private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicParti Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition); Double messageRate = getMessageRate(pubSubTopicPartition); Double pollRate = getPollRate(pubSubTopicPartition); + Double consumerPollRate = overallConsumerPollRate.get().measure(metricConfig, System.currentTimeMillis()); String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition; if (offsetLag > 200000 && messageRate < 200 && !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) { LOGGER.warn( - "Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {}", + "Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {},Consumer Poll Rate: {}", pubSubTopicPartition, offsetLag, messageRate, - pollRate); + pollRate, + consumerPollRate); } }