[da-vinci] Bug fix for peer discovery in DVC with multi-stores #4291
Annotations
4 errors
Publish Test Report:
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SITWithTWiseAndBufferAfterLeaderTest.java#L1
org.mockito.exceptions.verification.ArgumentsAreDifferent: Argument(s) are different! Wanted:
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
1,
OffsetRecord{localVersionTopicOffset=3, upstreamOffset=-1, leaderTopic=null, offsetLag=0, eventTimeEpochMs=-1, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=true, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTaskTest.lambda$testNotifier$35(StoreIngestionTaskTest.java:1619)
Actual invocations have different arguments:
storageMetadataService.getLastOffset(
"TestTopic_78e13c0dac_d5907882_v1",
1
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.getLastOffset(DeepCopyOffsetManager.java:49)
storageMetadataService.getLastOffset(
"TestTopic_78e13c0dac_d5907882_v1",
2
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.getLastOffset(DeepCopyOffsetManager.java:49)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@4c9dabdf
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
2,
OffsetRecord{localVersionTopicOffset=1, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@256b41ee
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
2,
OffsetRecord{localVersionTopicOffset=2, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
1,
OffsetRecord{localVersionTopicOffset=1, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@3dc78f93
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
1,
OffsetRecord{localVersionTopicOffset=2, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@58bddade
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataServi
|
Publish Test Report:
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SITWithTWiseWithoutBufferAfterLeaderTest.java#L1
Wanted but not invoked:
hostLevelIngestionStats.recordTotalBytesConsumed(
<any long>
);
-> at com.linkedin.davinci.stats.HostLevelIngestionStats.recordTotalBytesConsumed(HostLevelIngestionStats.java:499)
However, there were exactly 30 interactions with this mock:
hostLevelIngestionStats.recordProcessConsumerActionLatency(
7.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.004989d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.checkLongRunningTaskState(LeaderFollowerStoreIngestionTask.java:756)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.recordQuotaMetrics(StoreIngestionTask.java:1635)
hostLevelIngestionStats.recordProcessConsumerActionLatency(
0.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.001924d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.checkLongRunningTaskState(LeaderFollowerStoreIngestionTask.java:756)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.recordQuotaMetrics(StoreIngestionTask.java:1635)
hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(
0.09597800000000001d,
1738805465294L
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1302)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1307)
hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(
0.057137d,
1738805465295L
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1302)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1307)
hostLevelIngestionStats.recordTotalRecordsConsumed();
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerRecord(StoreIngestionTask.java:2621)
hostLevelIngestionStats.recordProcessConsumerActionLatency(
0.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.001684d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.checkLongRunningTaskState(LeaderFollowerStoreIngestionTask.java:756)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.recordQuotaMetrics(StoreIngestionTask.java:1635)
hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(
0.045826000000000006d,
1738805465306L
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1302)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1307)
hostLevelIngestionStats.recordTotalRecordsConsumed();
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerRecord(StoreIngestionTask.java:2621)
hostLevelIngestionStats.recordTotalRecordsConsumed();
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerRecord(StoreIngestionTask.java:2621)
hostLevelIngestionStats.recordProcessConsumerActionLatency(
0.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.002023d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerS
|
Publish Test Report:
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SITWithTWiseAndBufferAfterLeaderTest.java#L1
org.mockito.exceptions.verification.ArgumentsAreDifferent: Argument(s) are different! Wanted:
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
1,
OffsetRecord{localVersionTopicOffset=3, upstreamOffset=-1, leaderTopic=null, offsetLag=0, eventTimeEpochMs=-1, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=true, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTaskTest.lambda$testNotifier$35(StoreIngestionTaskTest.java:1619)
Actual invocations have different arguments:
storageMetadataService.getLastOffset(
"TestTopic_78e13c0dac_d5907882_v1",
1
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.getLastOffset(DeepCopyOffsetManager.java:49)
storageMetadataService.getLastOffset(
"TestTopic_78e13c0dac_d5907882_v1",
2
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.getLastOffset(DeepCopyOffsetManager.java:49)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@4c9dabdf
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
2,
OffsetRecord{localVersionTopicOffset=1, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@256b41ee
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
2,
OffsetRecord{localVersionTopicOffset=2, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
1,
OffsetRecord{localVersionTopicOffset=1, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@3dc78f93
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataService.put(
"TestTopic_78e13c0dac_d5907882_v1",
1,
OffsetRecord{localVersionTopicOffset=2, upstreamOffset=-1, leaderTopic=null, offsetLag=9223372036854775807, eventTimeEpochMs=1738805549681, latestProducerProcessingTimeInMs=0, isEndOfPushReceived=false, databaseInfo={}, realTimeProducerState={}, recordTransformerClassHash=null}
);
-> at com.linkedin.venice.offsets.DeepCopyOffsetManager.put(DeepCopyOffsetManager.java:38)
storageMetadataService.computeStoreVersionState(
"TestTopic_78e13c0dac_d5907882_v1",
com.linkedin.venice.offsets.DeepCopyStorageMetadataService$$Lambda$999/0x000000080086ec40@58bddade
);
-> at com.linkedin.venice.offsets.DeepCopyStorageMetadataService.computeStoreVersionState(DeepCopyStorageMetadataService.java:34)
storageMetadataServi
|
Publish Test Report:
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SITWithTWiseWithoutBufferAfterLeaderTest.java#L1
Wanted but not invoked:
hostLevelIngestionStats.recordTotalBytesConsumed(
<any long>
);
-> at com.linkedin.davinci.stats.HostLevelIngestionStats.recordTotalBytesConsumed(HostLevelIngestionStats.java:499)
However, there were exactly 30 interactions with this mock:
hostLevelIngestionStats.recordProcessConsumerActionLatency(
7.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.004989d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.checkLongRunningTaskState(LeaderFollowerStoreIngestionTask.java:756)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.recordQuotaMetrics(StoreIngestionTask.java:1635)
hostLevelIngestionStats.recordProcessConsumerActionLatency(
0.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.001924d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.checkLongRunningTaskState(LeaderFollowerStoreIngestionTask.java:756)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.recordQuotaMetrics(StoreIngestionTask.java:1635)
hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(
0.09597800000000001d,
1738805465294L
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1302)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1307)
hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(
0.057137d,
1738805465295L
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1302)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1307)
hostLevelIngestionStats.recordTotalRecordsConsumed();
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerRecord(StoreIngestionTask.java:2621)
hostLevelIngestionStats.recordProcessConsumerActionLatency(
0.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.001684d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.checkLongRunningTaskState(LeaderFollowerStoreIngestionTask.java:756)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.recordQuotaMetrics(StoreIngestionTask.java:1635)
hostLevelIngestionStats.recordConsumerRecordsQueuePutLatency(
0.045826000000000006d,
1738805465306L
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1302)
hostLevelIngestionStats.recordStorageQuotaUsed(
NaNd
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.produceToStoreBufferServiceOrKafka(StoreIngestionTask.java:1307)
hostLevelIngestionStats.recordTotalRecordsConsumed();
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerRecord(StoreIngestionTask.java:2621)
hostLevelIngestionStats.recordTotalRecordsConsumed();
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerRecord(StoreIngestionTask.java:2621)
hostLevelIngestionStats.recordProcessConsumerActionLatency(
0.0d
);
-> at com.linkedin.davinci.kafka.consumer.StoreIngestionTask.processConsumerActions(StoreIngestionTask.java:1995)
hostLevelIngestionStats.recordCheckLongRunningTasksLatency(
0.002023d
);
-> at com.linkedin.davinci.kafka.consumer.LeaderFollowerS
|
Loading