Skip to content

Commit e9c05ff

Browse files
authored
[server][misc] Move leader->follower heartbeat tracking action to after transition complete (#1681)
The bug happens is: When a leader transits into follower, we will first clear leader HB entry and initialize follower HB entry, and then submit request to transition into follower to KafkaStoreIngestionService. The problem is, in between, the leader will need to drain all the consumed messages before turning into follower and start consuming VT. In between, if we see any leader HB message, it will be recorded into the leader entry, but then this will remain as a stale leader heartbeat and get recorded.
1 parent dcc89e8 commit e9c05ff

File tree

22 files changed

+145
-64
lines changed

22 files changed

+145
-64
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModel.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,6 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte
136136
@Transition(to = HelixState.STANDBY_STATE, from = HelixState.LEADER_STATE)
137137
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
138138
LeaderSessionIdChecker checker = new LeaderSessionIdChecker(leaderSessionId.incrementAndGet(), leaderSessionId);
139-
heartbeatMonitoringService
140-
.updateLagMonitor(message.getResourceName(), getPartition(), HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR);
141139
executeStateTransition(
142140
message,
143141
context,

clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import com.linkedin.davinci.store.AbstractStorageEngine;
1212
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
1313
import com.linkedin.venice.meta.Store;
14+
import com.linkedin.venice.meta.StoreVersionInfo;
1415
import com.linkedin.venice.meta.Version;
1516
import com.linkedin.venice.offsets.OffsetRecord;
16-
import com.linkedin.venice.utils.Pair;
1717
import com.linkedin.venice.utils.Utils;
1818
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
1919
import java.util.Map;
@@ -55,10 +55,10 @@ public DefaultIngestionBackend(
5555
public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition) {
5656
String storeVersion = storeConfig.getStoreVersionName();
5757
LOGGER.info("Retrieving storage engine for store {} partition {}", storeVersion, partition);
58-
Pair<Store, Version> storeAndVersion =
58+
StoreVersionInfo storeAndVersion =
5959
Utils.waitStoreVersionOrThrow(storeVersion, getStoreIngestionService().getMetadataRepo());
6060
Supplier<StoreVersionState> svsSupplier = () -> storageMetadataService.getStoreVersionState(storeVersion);
61-
syncStoreVersionConfig(storeAndVersion.getFirst(), storeConfig);
61+
syncStoreVersionConfig(storeAndVersion.getStore(), storeConfig);
6262

6363
Runnable runnable = () -> {
6464
AbstractStorageEngine storageEngine =
@@ -79,7 +79,7 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition
7979
storeVersion,
8080
partition);
8181
};
82-
if (!storeAndVersion.getFirst().isBlobTransferEnabled() || blobTransferManager == null) {
82+
if (!storeAndVersion.getStore().isBlobTransferEnabled() || blobTransferManager == null) {
8383
runnable.run();
8484
} else {
8585
// Open store for lag check and later metadata update for offset/StoreVersionState
@@ -91,8 +91,8 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition
9191
: BlobTransferTableFormat.BLOCK_BASED_TABLE;
9292

9393
CompletionStage<Void> bootstrapFuture = bootstrapFromBlobs(
94-
storeAndVersion.getFirst(),
95-
storeAndVersion.getSecond().getNumber(),
94+
storeAndVersion.getStore(),
95+
storeAndVersion.getVersion().getNumber(),
9696
partition,
9797
requestTableFormat,
9898
serverConfig.getBlobTransferDisabledOffsetLagThreshold());

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import com.linkedin.venice.meta.ServerAdminAction;
5151
import com.linkedin.venice.meta.Store;
5252
import com.linkedin.venice.meta.StoreDataChangedListener;
53+
import com.linkedin.venice.meta.StoreVersionInfo;
5354
import com.linkedin.venice.meta.Version;
5455
import com.linkedin.venice.meta.VersionStatus;
5556
import com.linkedin.venice.offsets.OffsetRecord;
@@ -81,7 +82,6 @@
8182
import com.linkedin.venice.utils.DaemonThreadFactory;
8283
import com.linkedin.venice.utils.DiskUsage;
8384
import com.linkedin.venice.utils.LatencyUtils;
84-
import com.linkedin.venice.utils.Pair;
8585
import com.linkedin.venice.utils.SystemTime;
8686
import com.linkedin.venice.utils.Time;
8787
import com.linkedin.venice.utils.Utils;
@@ -563,10 +563,10 @@ private StoreIngestionTask createStoreIngestionTask(
563563
String storeName = Version.parseStoreFromKafkaTopicName(veniceStoreVersionConfig.getStoreVersionName());
564564
int versionNumber = Version.parseVersionFromKafkaTopicName(veniceStoreVersionConfig.getStoreVersionName());
565565

566-
Pair<Store, Version> storeVersionPair =
566+
StoreVersionInfo storeVersionPair =
567567
Utils.waitStoreVersionOrThrow(veniceStoreVersionConfig.getStoreVersionName(), metadataRepo);
568-
Store store = storeVersionPair.getFirst();
569-
Version version = storeVersionPair.getSecond();
568+
Store store = storeVersionPair.getStore();
569+
Version version = storeVersionPair.getVersion();
570570

571571
BooleanSupplier isVersionCurrent = () -> {
572572
try {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,11 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
566566
partitionConsumptionState.setLeaderFollowerState(STANDBY);
567567
updateLeaderTopicOnFollower(partitionConsumptionState);
568568
}
569+
// Make sure we stop consuming from leader upstream before we switch heartbeat monitoring.
570+
getHeartbeatMonitoringService().updateLagMonitor(
571+
topicName,
572+
partitionConsumptionState.getPartition(),
573+
HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR);
569574
LOGGER.info("Replica: {} moved to standby/follower state", partitionConsumptionState.getReplicaId());
570575

571576
/**

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import com.linkedin.davinci.stats.HeartbeatMonitoringServiceStats;
77
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
88
import com.linkedin.venice.meta.Store;
9+
import com.linkedin.venice.meta.StoreVersionInfo;
910
import com.linkedin.venice.meta.Version;
1011
import com.linkedin.venice.meta.VersionImpl;
1112
import com.linkedin.venice.service.AbstractVeniceService;
12-
import com.linkedin.venice.utils.Pair;
1313
import com.linkedin.venice.utils.Utils;
1414
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
1515
import io.tehuti.metrics.MetricConfig;
@@ -294,10 +294,10 @@ public void updateLagMonitor(
294294
try {
295295
String storeName = Version.parseStoreFromKafkaTopicName(resourceName);
296296
int storeVersion = Version.parseVersionFromKafkaTopicName(resourceName);
297-
Pair<Store, Version> res =
297+
StoreVersionInfo res =
298298
getMetadataRepository().waitVersion(storeName, storeVersion, getMaxWaitForVersionInfo(), 200);
299-
Store store = res.getFirst();
300-
Version version = res.getSecond();
299+
Store store = res.getStore();
300+
Version version = res.getVersion();
301301
if (store == null) {
302302
LOGGER.error(
303303
"Failed to get store for resource: {} with trigger: {}. Will not update lag monitor.",

clients/da-vinci-client/src/test/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModelTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ public void testUpdateLagMonitor() {
8585

8686
// LEADER->STANDBY
8787
leaderFollowerPartitionStateModelSpy.onBecomeStandbyFromLeader(message, context);
88-
verify(heartbeatMonitoringService)
88+
verify(heartbeatMonitoringService, never())
8989
.updateLagMonitor(eq(resourceName), eq(partition), eq(HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR));
9090

9191
// OFFLINE->STANDBY
9292
leaderFollowerPartitionStateModelSpy.onBecomeStandbyFromOffline(message, context);
93-
verify(heartbeatMonitoringService, times(2))
93+
verify(heartbeatMonitoringService, times(1))
9494
.updateLagMonitor(eq(resourceName), eq(partition), eq(HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR));
9595

9696
// STANDBY->OFFLINE

clients/da-vinci-client/src/test/java/com/linkedin/davinci/helix/VeniceLeaderFollowerStateModelTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
1818
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
1919
import com.linkedin.venice.meta.Store;
20+
import com.linkedin.venice.meta.StoreVersionInfo;
2021
import com.linkedin.venice.meta.Version;
2122
import com.linkedin.venice.meta.VersionImpl;
22-
import com.linkedin.venice.utils.Pair;
2323
import io.tehuti.metrics.MetricsRepository;
2424
import java.time.Duration;
2525
import java.util.HashSet;
@@ -134,7 +134,7 @@ public void testWhenBecomeOfflineFromStandbyWithVersionDeletion() {
134134
when(mockIngestionBackend.stopConsumption(any(VeniceStoreVersionConfig.class), eq(testPartition)))
135135
.thenReturn(CompletableFuture.completedFuture(null));
136136
when(mockReadOnlyStoreRepository.waitVersion(eq(storeName), eq(version), any(), anyLong()))
137-
.thenReturn(Pair.create(mockStore, null));
137+
.thenReturn(new StoreVersionInfo(mockStore, null));
138138
testStateModel.onBecomeOfflineFromStandby(mockMessage, mockContext);
139139
verify(spyHeartbeatMonitoringService).removeLagMonitor(any(), eq(testPartition));
140140
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
2828
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
2929
import com.linkedin.venice.meta.Store;
30+
import com.linkedin.venice.meta.StoreVersionInfo;
3031
import com.linkedin.venice.meta.Version;
3132
import com.linkedin.venice.offsets.OffsetRecord;
32-
import com.linkedin.venice.utils.Pair;
3333
import java.io.InputStream;
3434
import java.time.Duration;
3535
import java.util.concurrent.CompletableFuture;
@@ -82,7 +82,7 @@ public void setUp() {
8282
MockitoAnnotations.openMocks(this);
8383
when(store.getName()).thenReturn(STORE_NAME);
8484
when(version.getNumber()).thenReturn(VERSION_NUMBER);
85-
Pair<Store, Version> storeAndVersion = Pair.create(store, version);
85+
StoreVersionInfo storeAndVersion = new StoreVersionInfo(store, version);
8686

8787
when(storeConfig.getStoreVersionName()).thenReturn(STORE_VERSION);
8888
when(storeIngestionService.getMetadataRepo()).thenReturn(metadataRepo);

clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import com.linkedin.venice.exceptions.VeniceTimeoutException;
3131
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
3232
import com.linkedin.venice.meta.Store;
33+
import com.linkedin.venice.meta.StoreVersionInfo;
3334
import com.linkedin.venice.meta.Version;
34-
import com.linkedin.venice.utils.Pair;
3535
import com.linkedin.venice.utils.TestUtils;
3636
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
3737
import java.util.HashMap;
@@ -57,7 +57,7 @@ public void testBackendCanDirectCommandCorrectly() {
5757
KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class);
5858
ReadOnlyStoreRepository repository = mock(ReadOnlyStoreRepository.class);
5959
when(repository.waitVersion(anyString(), anyInt(), any()))
60-
.thenReturn(Pair.create(mock(Store.class), mock(Version.class)));
60+
.thenReturn(new StoreVersionInfo(mock(Store.class), mock(Version.class)));
6161
when(storeIngestionService.getMetadataRepo()).thenReturn(repository);
6262
when(storeIngestionService.isPartitionConsuming(topic, partition)).thenReturn(true);
6363
when(backend.getStoreIngestionService()).thenReturn(storeIngestionService);
@@ -122,7 +122,7 @@ public void testBackendCanHandleErrorCorrectly() {
122122
KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class);
123123
ReadOnlyStoreRepository repository = mock(ReadOnlyStoreRepository.class);
124124
when(repository.waitVersion(anyString(), anyInt(), any()))
125-
.thenReturn(Pair.create(mock(Store.class), mock(Version.class)));
125+
.thenReturn(new StoreVersionInfo(mock(Store.class), mock(Version.class)));
126126
when(storeIngestionService.getMetadataRepo()).thenReturn(repository);
127127
when(storeIngestionService.isPartitionConsuming(topic, partition)).thenReturn(true);
128128
when(backend.getStoreIngestionService()).thenReturn(storeIngestionService);
@@ -171,7 +171,7 @@ public void testBackendCanHandleErrorCorrectly() {
171171
*/
172172
executionFlag.set(0);
173173
topicIngestionStatusMap.clear();
174-
when(repository.waitVersion(anyString(), anyInt(), any())).thenReturn(Pair.create(null, null));
174+
when(repository.waitVersion(anyString(), anyInt(), any())).thenReturn(new StoreVersionInfo(null, null));
175175

176176
Assert.assertThrows(VeniceException.class, () -> {
177177
backend.executeCommandWithRetry(topic, partition, START_CONSUMPTION, () -> {

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.linkedin.venice.meta.ReadStrategy;
4242
import com.linkedin.venice.meta.RoutingStrategy;
4343
import com.linkedin.venice.meta.Store;
44+
import com.linkedin.venice.meta.StoreVersionInfo;
4445
import com.linkedin.venice.meta.Version;
4546
import com.linkedin.venice.meta.VersionImpl;
4647
import com.linkedin.venice.meta.VersionStatus;
@@ -59,7 +60,6 @@
5960
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
6061
import com.linkedin.venice.service.ICProvider;
6162
import com.linkedin.venice.utils.DataProviderUtils;
62-
import com.linkedin.venice.utils.Pair;
6363
import com.linkedin.venice.utils.TestUtils;
6464
import com.linkedin.venice.utils.VeniceProperties;
6565
import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager;
@@ -314,9 +314,9 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
314314
doReturn(toBeDeletedStore).when(mockMetadataRepo).getStore(deletedStoreName);
315315
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
316316
doReturn(toBeDeletedStore).when(mockMetadataRepo).getStoreOrThrow(deletedStoreName);
317-
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
317+
doReturn(new StoreVersionInfo(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
318318
.waitVersion(eq(storeName), eq(1), any());
319-
doReturn(new Pair<>(toBeDeletedStore, toBeDeletedStore.getVersion(1))).when(mockMetadataRepo)
319+
doReturn(new StoreVersionInfo(toBeDeletedStore, toBeDeletedStore.getVersion(1))).when(mockMetadataRepo)
320320
.waitVersion(eq(deletedStoreName), eq(1), any());
321321
VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
322322
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(topic1, veniceProperties), 0);
@@ -330,7 +330,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
330330
0,
331331
"Expecting an empty set since all ingesting topics have version status of ONLINE");
332332
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id"));
333-
doReturn(new Pair<>(mockStore, mockStore.getVersion(2))).when(mockMetadataRepo)
333+
doReturn(new StoreVersionInfo(mockStore, mockStore.getVersion(2))).when(mockMetadataRepo)
334334
.waitVersion(eq(storeName), eq(2), any());
335335
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(topic2, veniceProperties), 0);
336336
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(invalidTopic, veniceProperties), 0);
@@ -398,7 +398,7 @@ public void testCloseStoreIngestionTask() {
398398
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
399399
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
400400
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
401-
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
401+
doReturn(new StoreVersionInfo(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
402402
.waitVersion(eq(storeName), eq(1), any());
403403
VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
404404
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(topicName, veniceProperties), 0);
@@ -470,7 +470,7 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest
470470
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
471471
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
472472
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
473-
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
473+
doReturn(new StoreVersionInfo(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
474474
.waitVersion(eq(storeName), eq(1), any());
475475
VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
476476
VeniceStoreVersionConfig config = new VeniceStoreVersionConfig(topicName, veniceProperties);
@@ -640,7 +640,7 @@ public void testCentralizedIdleIngestionTaskCleanupService() {
640640
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id"));
641641
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
642642
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
643-
doReturn(new Pair<>(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
643+
doReturn(new StoreVersionInfo(mockStore, mockStore.getVersion(1))).when(mockMetadataRepo)
644644
.waitVersion(eq(storeName), eq(1), any());
645645
VeniceProperties veniceProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
646646
VeniceStoreVersionConfig config = new VeniceStoreVersionConfig(topicName, veniceProperties);

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
2727
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
2828
import com.linkedin.davinci.stats.HostLevelIngestionStats;
29+
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
2930
import com.linkedin.davinci.storage.StorageMetadataService;
3031
import com.linkedin.davinci.storage.StorageService;
3132
import com.linkedin.davinci.store.view.MaterializedViewWriter;
@@ -212,6 +213,7 @@ public void setUp() throws InterruptedException {
212213
.setServerConfig(mockVeniceServerConfig)
213214
.setPubSubTopicRepository(pubSubTopicRepository)
214215
.setVeniceViewWriterFactory(mockVeniceViewWriterFactory)
216+
.setHeartbeatMonitoringService(mock(HeartbeatMonitoringService.class))
215217
.setCompressorFactory(new StorageEngineBackedCompressorFactory(inMemoryStorageMetadataService))
216218
.setHostLevelIngestionStats(aggHostLevelIngestionStats);
217219
when(builder.getSchemaRepo().getKeySchema(storeName)).thenReturn(new SchemaEntry(1, "\"string\""));

clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import com.linkedin.venice.meta.HybridStoreConfigImpl;
2929
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
3030
import com.linkedin.venice.meta.Store;
31+
import com.linkedin.venice.meta.StoreVersionInfo;
3132
import com.linkedin.venice.meta.Version;
3233
import com.linkedin.venice.meta.VersionImpl;
3334
import com.linkedin.venice.utils.DataProviderUtils;
34-
import com.linkedin.venice.utils.Pair;
3535
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
3636
import io.tehuti.metrics.MetricsRepository;
3737
import java.time.Duration;
@@ -442,7 +442,7 @@ public void testUpdateLagMonitor() {
442442

443443
// 1. Test when both store and version are null
444444
when(metadataRepo.waitVersion(eq(storeName), eq(storeVersion), any(Duration.class), anyLong()))
445-
.thenReturn(Pair.create(null, null));
445+
.thenReturn(new StoreVersionInfo(null, null));
446446
heartbeatMonitoringService.updateLagMonitor(resourceName, partition, HeartbeatLagMonitorAction.SET_LEADER_MONITOR);
447447
verify(metadataRepo).waitVersion(eq(storeName), eq(storeVersion), any(Duration.class), anyLong());
448448
verify(heartbeatMonitoringService, never()).addLeaderLagMonitor(any(Version.class), anyInt());
@@ -458,7 +458,7 @@ public void testUpdateLagMonitor() {
458458

459459
// 2. Test when store is not null and version is null
460460
when(metadataRepo.waitVersion(eq(storeName), eq(storeVersion), any(Duration.class), anyLong()))
461-
.thenReturn(Pair.create(store, null));
461+
.thenReturn(new StoreVersionInfo(store, null));
462462
heartbeatMonitoringService.updateLagMonitor(resourceName, partition, HeartbeatLagMonitorAction.SET_LEADER_MONITOR);
463463
verify(metadataRepo, times(4)).waitVersion(eq(storeName), eq(storeVersion), any(Duration.class), anyLong());
464464
verify(heartbeatMonitoringService, never()).addLeaderLagMonitor(any(Version.class), anyInt());
@@ -474,7 +474,7 @@ public void testUpdateLagMonitor() {
474474

475475
// 3. Test both store and version are not null
476476
when(metadataRepo.waitVersion(eq(storeName), eq(storeVersion), any(Duration.class), anyLong()))
477-
.thenReturn(Pair.create(store, version));
477+
.thenReturn(new StoreVersionInfo(store, version));
478478
heartbeatMonitoringService.updateLagMonitor(resourceName, partition, HeartbeatLagMonitorAction.SET_LEADER_MONITOR);
479479
verify(metadataRepo, times(7)).waitVersion(eq(storeName), eq(storeVersion), any(Duration.class), anyLong());
480480
verify(heartbeatMonitoringService).addLeaderLagMonitor(version, partition);

0 commit comments

Comments
 (0)