From d5252640aece96ea6028a54f66f5e1e460200b1e Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Fri, 7 Feb 2025 11:59:48 -0800 Subject: [PATCH] Add overall consumer poll rate. --- .../linkedin/davinci/kafka/consumer/ConsumptionTask.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java index 836676a4bfc..8bcec6f9565 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java @@ -66,6 +66,8 @@ class ConsumptionTask implements Runnable { private final Map messageRatePerTopicPartition = new VeniceConcurrentHashMap<>(); private final Map bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>(); private final Map pollRatePerTopicPartition = new VeniceConcurrentHashMap<>(); + + private final Rate overallConsumerPollRate = createRate(System.currentTimeMillis()); private final RedundantExceptionFilter redundantExceptionFilter; private final Map lastSuccessfulPollTimestampPerTopicPartition = new VeniceConcurrentHashMap<>(); @@ -196,6 +198,7 @@ public void run() { consumedDataReceiver.write(topicPartitionMessages); checkSlowPartitionWithHighLag(pubSubTopicPartition); } + overallConsumerPollRate.record(1, lastSuccessfulPollTimestamp); aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency( LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp)); aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount); @@ -302,15 +305,17 @@ private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicParti Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition); Double messageRate = getMessageRate(pubSubTopicPartition); Double pollRate = getPollRate(pubSubTopicPartition); + Double consumerPollRate = overallConsumerPollRate.measure(metricConfig, System.currentTimeMillis()); String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition; if (offsetLag > 200000 && messageRate < 200 && !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) { LOGGER.warn( - "Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {}", + "Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {},Consumer Poll Rate: {}", pubSubTopicPartition, offsetLag, messageRate, - pollRate); + pollRate, + consumerPollRate); } }