Skip to content

Commit

Permalink
Adding poll rate to see how slow partition consumping.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Feb 6, 2025
1 parent 5dfe33b commit 010a17c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricConfig;
Expand All @@ -19,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -49,6 +51,8 @@ class ConsumptionTask implements Runnable {
new VeniceConcurrentHashMap<>();
private final long readCycleDelayMs;
private final Supplier<Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>>> pollFunction;

private final Function<PubSubTopicPartition, Long> offsetLagGetter;
private final IntConsumer bandwidthThrottler;
private final IntConsumer recordsThrottler;
private final AggKafkaConsumerServiceStats aggStats;
Expand All @@ -61,6 +65,8 @@ class ConsumptionTask implements Runnable {
*/
private final Map<PubSubTopicPartition, Rate> messageRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final Map<PubSubTopicPartition, Rate> bytesRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final Map<PubSubTopicPartition, Rate> pollRatePerTopicPartition = new VeniceConcurrentHashMap<>();
private final RedundantExceptionFilter redundantExceptionFilter;
private final Map<PubSubTopicPartition, Long> lastSuccessfulPollTimestampPerTopicPartition =
new VeniceConcurrentHashMap<>();

Expand All @@ -87,14 +93,18 @@ public ConsumptionTask(
final IntConsumer bandwidthThrottler,
final IntConsumer recordsThrottler,
final AggKafkaConsumerServiceStats aggStats,
final ConsumerSubscriptionCleaner cleaner) {
final ConsumerSubscriptionCleaner cleaner,
Function<PubSubTopicPartition, Long> offsetLagGetter,
RedundantExceptionFilter redundantExceptionFilter) {
this.readCycleDelayMs = readCycleDelayMs;
this.pollFunction = pollFunction;
this.bandwidthThrottler = bandwidthThrottler;
this.recordsThrottler = recordsThrottler;
this.aggStats = aggStats;
this.cleaner = cleaner;
this.taskId = taskId;
this.offsetLagGetter = offsetLagGetter;
this.redundantExceptionFilter = redundantExceptionFilter;
this.consumptionTaskIdStr = Utils.getSanitizedStringForLogger(consumerNamePrefix) + " - " + taskId;
this.LOGGER = LogManager.getLogger(getClass().getSimpleName() + "[ " + consumptionTaskIdStr + " ]");
}
Expand Down Expand Up @@ -180,8 +190,11 @@ public void run() {
bytesRatePerTopicPartition
.computeIfAbsent(pubSubTopicPartition, tp -> createRate(lastSuccessfulPollTimestamp))
.record(payloadSizePerTopicPartition, lastSuccessfulPollTimestamp);

pollRatePerTopicPartition
.computeIfAbsent(pubSubTopicPartition, tp -> createRate(lastSuccessfulPollTimestamp))
.record(1, lastSuccessfulPollTimestamp);
consumedDataReceiver.write(topicPartitionMessages);
checkSlowPartitionWithHighLag(pubSubTopicPartition);
}
aggStats.recordTotalConsumerRecordsProducingToWriterBufferLatency(
LatencyUtils.getElapsedTimeFromMsToMs(beforeProducingToWriteBufferTimestamp));
Expand Down Expand Up @@ -278,6 +291,29 @@ Double getByteRate(PubSubTopicPartition topicPartition) {
return 0.0D;
}

Double getPollRate(PubSubTopicPartition topicPartition) {
if (pollRatePerTopicPartition.containsKey(topicPartition)) {
return pollRatePerTopicPartition.get(topicPartition).measure(metricConfig, System.currentTimeMillis());
}
return 0.0D;
}

private void checkSlowPartitionWithHighLag(PubSubTopicPartition pubSubTopicPartition) {
Long offsetLag = offsetLagGetter.apply(pubSubTopicPartition);
Double messageRate = getMessageRate(pubSubTopicPartition);
Double pollRate = getPollRate(pubSubTopicPartition);
String slowTaskWithPartitionStr = consumptionTaskIdStr + " - " + pubSubTopicPartition;
if (offsetLag > 200000 && messageRate < 200
&& !redundantExceptionFilter.isRedundantException(slowTaskWithPartitionStr)) {
LOGGER.warn(
"Slow partition with high lag detected: {}. Lag: {}, Message Rate: {}, Poll Rate: {}",
pubSubTopicPartition,
offsetLag,
messageRate,
pollRate);
}
}

PubSubTopic getDestinationIdentifier(PubSubTopicPartition topicPartition) {
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> dataReceiver =
dataReceiverMap.get(topicPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -90,7 +91,7 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService
private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1;
// 4MB bitset size, 2 bitmaps for active and old bitset
private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(10));
new RedundantExceptionFilter(8 * 1024 * 1024 * 4, TimeUnit.MINUTES.toMillis(1));

/**
* @param statsOverride injection of stats, for test purposes
Expand Down Expand Up @@ -167,6 +168,7 @@ protected KafkaConsumerService(
pubSubConsumer::batchUnsubscribe,
time);

Function<PubSubTopicPartition, Long> offsetLagGetter = pubSubConsumer::getOffsetLag;
ConsumptionTask consumptionTask = new ConsumptionTask(
consumerNamePrefix,
i,
Expand All @@ -175,7 +177,9 @@ protected KafkaConsumerService(
bandwidthThrottlerFunction,
recordsThrottlerFunction,
this.aggStats,
cleaner);
cleaner,
offsetLagGetter,
REDUNDANT_LOGGING_FILTER);
consumerToConsumptionTask.putByIndex(pubSubConsumer, consumptionTask, i);
consumerToLocks.put(pubSubConsumer, new ReentrantLock());
}
Expand Down Expand Up @@ -560,6 +564,7 @@ private Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoF
long latestOffset = consumer.getLatestOffset(topicPartition);
double msgRate = consumptionTask.getMessageRate(topicPartition);
double byteRate = consumptionTask.getByteRate(topicPartition);
double pollRate = consumptionTask.getPollRate(topicPartition);
long lastSuccessfulPollTimestamp = consumptionTask.getLastSuccessfulPollTimestamp(topicPartition);
long elapsedTimeSinceLastPollInMs = ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP;
if (lastSuccessfulPollTimestamp != ConsumptionTask.DEFAULT_TOPIC_PARTITION_NO_POLL_TIMESTAMP) {
Expand All @@ -573,6 +578,7 @@ private Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoF
offsetLag,
msgRate,
byteRate,
pollRate,
consumerIdStr,
elapsedTimeSinceLastPollInMs,
destinationVersionTopicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class TopicPartitionIngestionInfo {
private long offsetLag;
private double msgRate;
private double byteRate;
private double pollRate;
private String consumerIdStr;
private long elapsedTimeSinceLastPollInMs;

Expand All @@ -20,13 +21,15 @@ public TopicPartitionIngestionInfo(
@JsonProperty("offsetLag") long offsetLag,
@JsonProperty("msgRate") double msgRate,
@JsonProperty("byteRate") double byteRate,
@JsonProperty("pollRate") double pollRate,
@JsonProperty("consumerIdStr") String consumerIdStr,
@JsonProperty("elapsedTimeSinceLastPollInMs") long elapsedTimeSinceLastPollInMs,
@JsonProperty("versionTopicName") String versionTopicName) {
this.latestOffset = latestOffset;
this.offsetLag = offsetLag;
this.msgRate = msgRate;
this.byteRate = byteRate;
this.pollRate = pollRate;
this.consumerIdStr = consumerIdStr;
this.elapsedTimeSinceLastPollInMs = elapsedTimeSinceLastPollInMs;
this.versionTopicName = versionTopicName;
Expand Down Expand Up @@ -60,6 +63,14 @@ public void setByteRate(double byteRate) {
this.byteRate = byteRate;
}

public double getPollRate() {
return pollRate;
}

public void setPollRate(double pollRate) {
this.pollRate = pollRate;
}

public String getConsumerIdStr() {
return consumerIdStr;
}
Expand Down Expand Up @@ -97,6 +108,7 @@ public boolean equals(Object o) {
&& this.offsetLag == topicPartitionIngestionInfo.getOffsetLag()
&& Double.doubleToLongBits(this.msgRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getMsgRate())
&& Double.doubleToLongBits(this.byteRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getByteRate())
&& Double.doubleToLongBits(this.pollRate) == Double.doubleToLongBits(topicPartitionIngestionInfo.getPollRate())
&& this.consumerIdStr.equals(topicPartitionIngestionInfo.getConsumerIdStr())
&& this.elapsedTimeSinceLastPollInMs == topicPartitionIngestionInfo.getElapsedTimeSinceLastPollInMs()
&& this.versionTopicName.equals(topicPartitionIngestionInfo.getVersionTopicName());
Expand All @@ -108,6 +120,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(offsetLag);
result = 31 * result + Double.hashCode(msgRate);
result = 31 * result + Double.hashCode(byteRate);
result = 31 * result + Double.hashCode(pollRate);
result = 31 * result + consumerIdStr.hashCode();
result = 31 * result + Long.hashCode(elapsedTimeSinceLastPollInMs);
result = 31 * result + versionTopicName.hashCode();
Expand All @@ -117,7 +130,7 @@ public int hashCode() {
@Override
public String toString() {
return "{" + "latestOffset:" + latestOffset + ", offsetLag:" + offsetLag + ", msgRate:" + msgRate + ", byteRate:"
+ byteRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:"
+ byteRate + ", pollRate:" + pollRate + ", consumerIdStr:" + consumerIdStr + ", elapsedTimeSinceLastPollInMs:"
+ elapsedTimeSinceLastPollInMs + ", versionTopicName:" + versionTopicName + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class TopicPartitionIngestionInfoTest {
public void testJsonParse() throws Exception {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic("test_store_v1");
TopicPartitionIngestionInfo topicPartitionIngestionInfo =
new TopicPartitionIngestionInfo(0, 1, 2.0, 4.0, "consumerIdStr", 7, versionTopic.getName());
new TopicPartitionIngestionInfo(0, 1, 2.0, 4.0, 1.0, "consumerIdStr", 7, versionTopic.getName());
String kafkaUrl = "localhost:1234";
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, 0);
Map<String, Map<String, TopicPartitionIngestionInfo>> topicPartitionIngestionContext = new HashMap<>();
Expand Down

0 comments on commit 010a17c

Please sign in to comment.