Skip to content

Commit

Permalink
Add overall consumer poll rate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Feb 7, 2025
1 parent 010a17c commit d525264
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class ConsumptionTask implements Runnable {
private final Map<PubSubTopicPartition, Rate> messageRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final Map<PubSubTopicPartition, Rate> bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final Map<PubSubTopicPartition, Rate> pollRatePerTopicPartition = new VeniceConcurrentHashMap<>();

private final Rate overallConsumerPollRate = createRate(System.currentTimeMillis());
private final RedundantExceptionFilter redundantExceptionFilter;
private final Map<PubSubTopicPartition, Long> lastSuccessfulPollTimestampPerTopicPartition =
new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -196,6 +198,7 @@ public void run() {
consumedDataReceiver.write(topicPartitionMessages);
checkSlowPartitionWithHighLag(pubSubTopicPartition);
}
overallConsumerPollRate.record(1, lastSuccessfulPollTimestamp);
aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency(
LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp));
aggStats.recordTotalNonZeroPollResultNum(polledPubSubMessagesCount);
Expand Down Expand Up @@ -302,15 +305,17 @@ private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicParti
Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition);
Double messageRate = getMessageRate(pubSubTopicPartition);
Double pollRate = getPollRate(pubSubTopicPartition);
Double consumerPollRate = overallConsumerPollRate.measure(metricConfig, System.currentTimeMillis());
String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition;
if (offsetLag > 200000 && messageRate < 200
&& !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) {
LOGGER.warn(
"Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {}",
"Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {},Consumer Poll Rate: {}",
pubSubTopicPartition,
offsetLag,
messageRate,
pollRate);
pollRate,
consumerPollRate);
}
}

Expand Down

0 comments on commit d525264

Please sign in to comment.