From 010a17c5347dc3a1bb518453faa49f072a6b6fd2 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Wed, 5 Feb 2025 17:14:37 -0800 Subject: [PATCH] Adding poll rate to see how slow partition consumping. --- .../kafka/consumer/ConsumptionTask.java | 40 ++++++++++++++++++- .../kafka/consumer/KafkaConsumerService.java | 10 ++++- .../consumer/TopicPartitionIngestionInfo.java | 15 ++++++- .../TopicPartitionIngestionInfoTest.java | 2 +- 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java index c72d67175dc..836676a4bfc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java @@ -10,6 +10,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.utils.ExceptionUtils; import com.linkedin.venice.utils.LatencyUtils; +import com.linkedin.venice.utils.RedundantExceptionFilter; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricConfig; @@ -19,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; @@ -49,6 +51,8 @@ class ConsumptionTask implements Runnable { new VeniceConcurrentHashMap<>(); private final long readCycleDelayMs; private final Supplier>>> pollFunction; + + private final Function offsetLagGetter; private final IntConsumer bandwidthThrottler; private final IntConsumer recordsThrottler; private final AggKafkaConsumerServiceStats aggStats; @@ -61,6 +65,8 @@ class ConsumptionTask implements Runnable { */ private final Map messageRatePerTopicPartition = new VeniceConcurrentHashMap<>(); private final Map bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>(); + private final Map pollRatePerTopicPartition = new VeniceConcurrentHashMap<>(); + private final RedundantExceptionFilter redundantExceptionFilter; private final Map lastSuccessfulPollTimestampPerTopicPartition = new VeniceConcurrentHashMap<>(); @@ -87,7 +93,9 @@ public ConsumptionTask( final IntConsumer bandwidthThrottler, final IntConsumer recordsThrottler, final AggKafkaConsumerServiceStats aggStats, - final ConsumerSubscriptionCleaner cleaner) { + final ConsumerSubscriptionCleaner cleaner, + Function offsetLagGetter, + RedundantExceptionFilter redundantExceptionFilter) { this.readCycleDelayMs = readCycleDelayMs; this.pollFunction = pollFunction; this.bandwidthThrottler = bandwidthThrottler; @@ -95,6 +103,8 @@ public ConsumptionTask( this.aggStats = aggStats; this.cleaner = cleaner; this.taskId = taskId; + this.offsetLagGetter = offsetLagGetter; + this.redundantExceptionFilter = redundantExceptionFilter; this.consumptionTaskIdStr = Utils.getSanitizedStringForLogger(consumerNamePrefix) + " - " + taskId; this.LOGGER = LogManager.getLogger(getClass().getSimpleName() + "[ " + consumptionTaskIdStr + " ]"); } @@ -180,8 +190,11 @@ public void run() { bytesRatePerTopicPartition .computeIfAbsent(pubSubTopicPartition, tp -> createRate(lastSuccessfulPollTimestamp)) .record(payloadSizePerTopicPartition, lastSuccessfulPollTimestamp); - + pollRatePerTopicPartition + .computeIfAbsent(pubSubTopicPartition, tp -> createRate(lastSuccessfulPollTimestamp)) + .record(1, lastSuccessfulPollTimestamp); consumedDataReceiver.write(topicPartitionMessages); + checkSlowPartitionWithHighLag(pubSubTopicPartition); } aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency( LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp)); @@ -278,6 +291,29 @@ Double getByteRate(PubSubTopicPartition topicPartition) { return 0.0D; } + Double getPollRate(PubSubTopicPartition topicPartition) { + if (pollRatePerTopicPartition.containsKey(topicPartition)) { + return pollRatePerTopicPartition.get(topicPartition).measure(metricConfig, System.currentTimeMillis()); + } + return 0.0D; + } + + private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicPartition) { + Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition); + Double messageRate = getMessageRate(pubSubTopicPartition); + Double pollRate = getPollRate(pubSubTopicPartition); + String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition; + if (offsetLag > 200000 && messageRate < 200 + && !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) { + LOGGER.warn( + "Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {}", + pubSubTopicPartition, + offsetLag, + messageRate, + pollRate); + } + } + PubSubTopic getDestinationIdentifier(PubSubTopicPartition topicPartition) { ConsumedDataReceiver>> dataReceiver = dataReceiverMap.get(topicPartition); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 7996b271073..4676f6a2aaa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -90,7 +91,7 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1; // 4MB bitset size, 2 bitmaps for active and old bitset private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = - new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(10)); + new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(1)); /** * @param statsOverride injection of stats, for test purposes @@ -167,6 +168,7 @@ protected KafkaConsumerService( pubSubConsumer::batchUnsubscribe, time); + Function offsetLagGetter = pubSubConsumer::getOffsetLag; ConsumptionTask consumptionTask = new ConsumptionTask( consumerNamePrefix, i, @@ -175,7 +177,9 @@ protected KafkaConsumerService( bandwidthThrottlerFunction, recordsThrottlerFunction, this.aggStats, - cleaner); + cleaner, + offsetLagGetter, + REDUNDANT_LOGGING_FILTER); consumerToConsumptionTask.putByIndex(pubSubConsumer, consumptionTask, i); consumerToLocks.put(pubSubConsumer, new ReentrantLock()); } @@ -560,6 +564,7 @@ private Map getIngestionInfoF long latestOffset = consumer.getLatestOffset(topicPartition); double msgRate = consumptionTask.getMessageRate(topicPartition); double byteRate = consumptionTask.getByteRate(topicPartition); + double pollRate = consumptionTask.getPollRate(topicPartition); long lastSuccessfulPollTimestamp = consumptionTask.getLastSuccessfulPollTimestamp(topicPartition); long elapsedTimeSinceLastPollInMs = ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP; if (lastSuccessfulPollTimestamp != ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP) { @@ -573,6 +578,7 @@ private Map getIngestionInfoF offsetLag, msgRate, byteRate, + pollRate, consumerIdStr, elapsedTimeSinceLastPollInMs, destinationVersionTopicName); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java index 83ee8e069ad..8f26382f9f5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/TopicPartitionIngestionInfo.java @@ -9,6 +9,7 @@ public class TopicPartitionIngestionInfo { private long offsetLag; private double msgRate; private double byteRate; + private double pollRate; private String consumerIdStr; private long elapsedTimeSinceLastPollInMs; @@ -20,6 +21,7 @@ public TopicPartitionIngestionInfo( @JsonProperty("offsetLag") long offsetLag, @JsonProperty("msgRate") double msgRate, @JsonProperty("byteRate") double byteRate, + @JsonProperty("pollRate") double pollRate, @JsonProperty("consumerIdStr") String consumerIdStr, @JsonProperty("elapsedTimeSinceLastPollInMs") long elapsedTimeSinceLastPollInMs, @JsonProperty("versionTopicName") String versionTopicName) { @@ -27,6 +29,7 @@ public TopicPartitionIngestionInfo( this.offsetLag = offsetLag; this.msgRate = msgRate; this.byteRate = byteRate; + this.pollRate = pollRate; this.consumerIdStr = consumerIdStr; this.elapsedTimeSinceLastPollInMs = elapsedTimeSinceLastPollInMs; this.versionTopicName = versionTopicName; @@ -60,6 +63,14 @@ public void setByteRate(double byteRate) { this.byteRate = byteRate; } + public double getPollRate() { + return pollRate; + } + + public void setPollRate(double pollRate) { + this.pollRate = pollRate; + } + public String getConsumerIdStr() { return consumerIdStr; } @@ -97,6 +108,7 @@ public boolean equals(Object o) { && this.offsetLag == topicPartitionIngestionInfo.getOffsetLag() && Double.doubleToLongBits(this.msgRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getMsgRate()) && Double.doubleToLongBits(this.byteRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getByteRate()) + && Double.doubleToLongBits(this.pollRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getPollRate()) && this.consumerIdStr.equals(topicPartitionIngestionInfo.getConsumerIdStr()) && this.elapsedTimeSinceLastPollInMs == topicPartitionIngestionInfo.getElapsedTimeSinceLastPollInMs() && this.versionTopicName.equals(topicPartitionIngestionInfo.getVersionTopicName()); @@ -108,6 +120,7 @@ public int hashCode() { result = 31 * result + Long.hashCode(offsetLag); result = 31 * result + Double.hashCode(msgRate); result = 31 * result + Double.hashCode(byteRate); + result = 31 * result + Double.hashCode(pollRate); result = 31 * result + consumerIdStr.hashCode(); result = 31 * result + Long.hashCode(elapsedTimeSinceLastPollInMs); result = 31 * result + versionTopicName.hashCode(); @@ -117,7 +130,7 @@ public int hashCode() { @Override public String toString() { return "{" + "latestOffset:" + latestOffset + ", offsetLag:" + offsetLag + ", msgRate:" + msgRate + ", byteRate:" - + byteRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:" + + byteRate + ", pollRate:" + pollRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:" + elapsedTimeSinceLastPollInMs + ", versionTopicName:" + versionTopicName + '}'; } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TopicPartitionIngestionInfoTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TopicPartitionIngestionInfoTest.java index a6cf8fb7878..12a1c98604d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TopicPartitionIngestionInfoTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TopicPartitionIngestionInfoTest.java @@ -20,7 +20,7 @@ public class TopicPartitionIngestionInfoTest { public void testJsonParse() throws Exception { PubSubTopic versionTopic = pubSubTopicRepository.getTopic("test_store_v1"); TopicPartitionIngestionInfo topicPartitionIngestionInfo = - new TopicPartitionIngestionInfo(0, 1, 2.0, 4.0, "consumerIdStr", 7, versionTopic.getName()); + new TopicPartitionIngestionInfo(0, 1, 2.0, 4.0, 1.0, "consumerIdStr", 7, versionTopic.getName()); String kafkaUrl = "localhost:1234"; PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, 0); Map> topicPartitionIngestionContext = new HashMap<>();