Skip to content

Commit fe1500f

Browse files
haoxu07Hao Xu
and
Hao Xu
authored
[server] Enhance the producer timestamp fetch logic to include both data and heartbeat messages. (#1422)
Update the topic metadata producer timestamp fetch logic to include heartbeat messages. Previously, the getProducerTimestampOfLastDataMessage function only considered data messages when retrieving the producer timestamp from the most recent messages. However, for real-time topics without data ingestion, the last messages could be heartbeat control messages, causing endless searching during the ready-to-serve check due to the continuous arrival of new heartbeat messages. Co-authored-by: Hao Xu <[email protected]>
1 parent cd915b9 commit fe1500f

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

Diff for: internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,8 @@ long getProducerTimestampOfLastDataMessage(PubSubTopicPartition pubSubTopicParti
504504
// iterate in reverse order to find the first data message (not control message) from the end
505505
for (int i = lastConsumedRecords.size() - 1; i >= 0; i--) {
506506
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = lastConsumedRecords.get(i);
507-
if (!record.getKey().isControlMessage()) {
507+
if (!record.getKey().isControlMessage()
508+
|| Arrays.equals(record.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
508509
stats.recordLatency(GET_PRODUCER_TIMESTAMP_OF_LAST_DATA_MESSAGE, startTime);
509510
// note that the timestamp is the producer timestamp and not the pubsub message (broker) timestamp
510511
return record.getValue().getProducerMetadata().getMessageTimestamp();

Diff for: internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,15 @@ public void testConsumeLatestRecords() {
383383
verify(consumerMock, times(2)).unSubscribe(eq(topicPartition));
384384
}
385385

386+
private PubSubMessage getHeartBeatPubSubMessage(PubSubTopicPartition topicPartition, long offset) {
387+
KafkaKey key = KafkaKey.HEART_BEAT;
388+
KafkaMessageEnvelope val = mock(KafkaMessageEnvelope.class);
389+
ProducerMetadata producerMetadata = new ProducerMetadata();
390+
producerMetadata.setMessageTimestamp(System.nanoTime());
391+
when(val.getProducerMetadata()).thenReturn(producerMetadata);
392+
return new ImmutablePubSubMessage(key, val, topicPartition, offset, System.currentTimeMillis(), 512);
393+
}
394+
386395
private PubSubMessage getPubSubMessage(PubSubTopicPartition topicPartition, boolean isControlMessage, long offset) {
387396
KafkaKey key = mock(KafkaKey.class);
388397
when(key.isControlMessage()).thenReturn(isControlMessage);
@@ -403,14 +412,20 @@ public void testGetProducerTimestampOfLastDataMessage() {
403412
assertEquals(timestamp, PUBSUB_NO_PRODUCER_TIME_IN_EMPTY_TOPIC_PARTITION);
404413
verify(metadataFetcherSpy, times(1)).consumeLatestRecords(eq(topicPartition), anyInt());
405414

406-
// test when there are no data messages to consume
415+
// test when there are no data messages and heartbeat messages to consume
407416
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> cm = getPubSubMessage(topicPartition, true, 5);
408417
doReturn(Collections.singletonList(cm)).when(metadataFetcherSpy).consumeLatestRecords(eq(topicPartition), anyInt());
409418
Throwable t = expectThrows(
410419
VeniceException.class,
411420
() -> metadataFetcherSpy.getProducerTimestampOfLastDataMessage(topicPartition));
412421
assertTrue(t.getMessage().contains("No data message found in topic-partition"));
413422

423+
// test when there are heartbeat messages but no data messages to consume
424+
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> hm = getHeartBeatPubSubMessage(topicPartition, 6);
425+
doReturn(Collections.singletonList(hm)).when(metadataFetcherSpy).consumeLatestRecords(eq(topicPartition), anyInt());
426+
timestamp = metadataFetcherSpy.getProducerTimestampOfLastDataMessage(topicPartition);
427+
assertEquals(timestamp, hm.getValue().getProducerMetadata().getMessageTimestamp());
428+
414429
// test when there are data messages to consume
415430
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> dm0 = getPubSubMessage(topicPartition, false, 4);
416431
doReturn(Collections.singletonList(dm0)).when(metadataFetcherSpy)

0 commit comments

Comments
 (0)