Skip to content

Commit 3065958

Browse files
author
Hao Xu
committed
Add overall consumer poll rate.
1 parent 010a17c commit 3065958

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.linkedin.venice.utils.RedundantExceptionFilter;
1414
import com.linkedin.venice.utils.Utils;
1515
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
16+
import com.linkedin.venice.utils.lazy.Lazy;
1617
import io.tehuti.metrics.MetricConfig;
1718
import io.tehuti.metrics.stats.Rate;
1819
import java.util.HashMap;
@@ -66,6 +67,8 @@ class ConsumptionTask implements Runnable {
6667
private final Map<PubSubTopicPartition, Rate> messageRatePerTopicPartition = new VeniceConcurrentHashMap<>();
6768
private final Map<PubSubTopicPartition, Rate> bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>();
6869
private final Map<PubSubTopicPartition, Rate> pollRatePerTopicPartition = new VeniceConcurrentHashMap<>();
70+
71+
private final Lazy<Rate> overallConsumerPollRate;
6972
private final RedundantExceptionFilter redundantExceptionFilter;
7073
private final Map<PubSubTopicPartition, Long> lastSuccessfulPollTimestampPerTopicPartition =
7174
new VeniceConcurrentHashMap<>();
@@ -105,6 +108,7 @@ public ConsumptionTask(
105108
this.taskId = taskId;
106109
this.offsetLagGetter = offsetLagGetter;
107110
this.redundantExceptionFilter = redundantExceptionFilter;
111+
this.overallConsumerPollRate = Lazy.of(() -> createRate(System.currentTimeMillis()));
108112
this.consumptionTaskIdStr = Utils.getSanitizedStringForLogger(consumerNamePrefix) + " - " + taskId;
109113
this.LOGGER = LogManager.getLogger(getClass().getSimpleName() + "[ " + consumptionTaskIdStr + " ]");
110114
}
@@ -196,6 +200,7 @@ public void run() {
196200
consumedDataReceiver.write(topicPartitionMessages);
197201
checkSlowPartitionWithHighLag(pubSubTopicPartition);
198202
}
203+
overallConsumerPollRate.get().record(1, lastSuccessfulPollTimestamp);
199204
aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency(
200205
LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp));
201206
aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount);
@@ -302,15 +307,17 @@ private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicParti
302307
Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition);
303308
Double messageRate = getMessageRate(pubSubTopicPartition);
304309
Double pollRate = getPollRate(pubSubTopicPartition);
310+
Double consumerPollRate = overallConsumerPollRate.get().measure(metricConfig, System.currentTimeMillis());
305311
String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition;
306312
if (offsetLag > 200000 && messageRate < 200
307313
&& !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) {
308314
LOGGER.warn(
309-
"Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {}",
315+
"Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {},Consumer Poll Rate: {}",
310316
pubSubTopicPartition,
311317
offsetLag,
312318
messageRate,
313-
pollRate);
319+
pollRate,
320+
consumerPollRate);
314321
}
315322
}
316323

0 commit comments

Comments
 (0)