diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 635bcb1d8db..68ff9c40cda 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -4,16 +4,19 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; import com.linkedin.venice.serialization.StoreDeserializerCache; import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.ConcurrentRef; import com.linkedin.venice.utils.ReferenceCounted; +import com.linkedin.venice.utils.RegionUtils; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -240,9 +243,24 @@ synchronized void trySubscribeDaVinciFutureVersion() { } else { return; } - LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName()); - setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats)); - daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e)); + + Store store = backend.getStoreRepository().getStoreOrThrow(storeName); + Set targetRegions = RegionUtils.parseRegionsFilterList(store.getTargetSwapRegion()); + String currentRegion = backend.getConfigLoader().getVeniceServerConfig().getRegionName(); + boolean isTargetRegionEnabled = !StringUtils.isEmpty(store.getTargetSwapRegion()); + boolean startIngestionInNonTargetRegion = + isTargetRegionEnabled && targetVersion.getStatus() == VersionStatus.ONLINE; + + // Subscribe to the future version if: + // 1. Target region push with delayed ingestion is not enabled + // 2. Target region push with delayed ingestion is enabled and the current region is a target region + // 3. Target region push with delayed ingestion is enabled and the current region is a non target region + // and the wait time has elapsed. The wait time has elapsed when the version status is marked ONLINE + if (targetRegions.contains(currentRegion) || startIngestionInNonTargetRegion || !isTargetRegionEnabled) { + LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName()); + setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats)); + daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e)); + } } /** @@ -373,4 +391,8 @@ private void swapCurrentVersion() { public StoreDeserializerCache getStoreDeserializerCache() { return storeDeserializerCache; } + + public String getStoreName() { + return storeName; + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java index cdc9d3edaa9..c2d1f5431f8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDeferredVersionSwap.java @@ -1,15 +1,25 @@ package com.linkedin.venice.endToEnd; +import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED; import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS; +import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; +import static com.linkedin.venice.ConfigKeys.LOCAL_REGION_NAME; import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import com.linkedin.davinci.client.DaVinciClient; +import com.linkedin.davinci.client.DaVinciConfig; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.integration.utils.DaVinciTestContext; import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.meta.StoreInfo; @@ -19,12 +29,16 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -40,6 +54,9 @@ public class TestDeferredVersionSwap { private static final String[] CLUSTER_NAMES = IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); + private static final int TEST_TIMEOUT = 120_000; + private static final Logger LOGGER = LogManager.getLogger(TestDeferredVersionSwap.class); + @BeforeClass public void setUp() { Properties controllerProps = new Properties(); @@ -68,7 +85,7 @@ public void cleanUp() { Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); } - @Test + @Test(timeOut = TEST_TIMEOUT) public void testDeferredVersionSwap() throws IOException { File inputDir = getTempDataDirectory(); TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100); @@ -131,4 +148,140 @@ public void testDeferredVersionSwap() throws IOException { } } + @Test(timeOut = TEST_TIMEOUT * 2) + public void testDvcDelayedIngestionWithTargetRegion() throws Exception { + // Setup job properties + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true); + storeParms.setTargetRegionSwapWaitTime(1); + storeParms.setTargetRegionSwap(TARGET_REGION); + String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); + String keySchemaStr = "\"int\""; + String valueSchemaStr = "\"int\""; + + // Create store + start a normal push + int keyCount = 100; + File inputDir = getTempDataDirectory(); + TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir, keyCount); + String inputDirPath = "file://" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("testDvcDelayedIngestionWithTargetRegion"); + Properties props = + IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName); + try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) { + createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, valueSchemaStr, props, storeParms).close(); + LOGGER.info("DvcDeferredVersionSwap starting normal push job"); + TestWriteUtils.runPushJob("Test push job", props); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 1), + parentControllerClient, + 30, + TimeUnit.SECONDS); + + // Version should only be swapped in all regions + TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + Assert.assertEquals((int) version, 1); + }); + }); + } + + // Create dvc client in target region + List childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); + VeniceClusterWrapper cluster1 = childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]); + VeniceProperties backendConfig = DaVinciTestContext.getDaVinciPropertyBuilder(cluster1.getZk().getAddress()) + .put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(LOCAL_REGION_NAME, TARGET_REGION) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + DaVinciClient client1 = + ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster1, new DaVinciConfig(), backendConfig); + client1.subscribeAll().get(); + + // Check that v1 is ingested + for (int i = 1; i <= keyCount; i++) { + assertNotNull(client1.get(i).get()); + } + + // Do another push with target region enabled + int keyCount2 = 200; + File inputDir2 = getTempDataDirectory(); + String inputDirPath2 = "file://" + inputDir2.getAbsolutePath(); + TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir2, keyCount2); + Properties props2 = + IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath2, storeName); + try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) { + props2.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + LOGGER.info("DvcDeferredVersionSwap starting target region push job"); + TestWriteUtils.runPushJob("Test push job", props2); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 2), + parentControllerClient, + 30, + TimeUnit.SECONDS); + + // Version should only be swapped in the target region + TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + if (colo.equals(TARGET_REGION)) { + Assert.assertEquals((int) version, 2); + } else { + Assert.assertEquals((int) version, 1); + } + }); + }); + + // Data should be automatically ingested in target region for dvc + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + for (int i = 101; i <= keyCount2; i++) { + assertNotNull(client1.get(i).get()); + } + }); + + // Close dvc client in target region + client1.close(); + + // Create dvc client in non target region + VeniceClusterWrapper cluster2 = childDatacenters.get(1).getClusters().get(CLUSTER_NAMES[0]); + VeniceProperties backendConfig2 = DaVinciTestContext.getDaVinciPropertyBuilder(cluster2.getZk().getAddress()) + .put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) + .put(LOCAL_REGION_NAME, "dc-1") + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .build(); + DaVinciClient client2 = + ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster2, new DaVinciConfig(), backendConfig2); + client2.subscribeAll().get(); + + // Check that v2 is not ingested + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + for (int i = 101; i <= keyCount2; i++) { + assertNull(client2.get(i).get()); + } + }); + + // Version should be swapped in all regions + LOGGER.info("DvcDeferredVersionSwap check that target region push is complete"); + TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + Assert.assertEquals((int) version, 2); + }); + }); + + // Check that v2 is ingested in dvc non target region + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + for (int i = 101; i <= keyCount2; i++) { + assertNotNull(client2.get(i).get()); + } + }); + + client2.close(); + } + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java index 427c68ab24e..c755253e74b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/DeferredVersionSwapService.java @@ -6,11 +6,8 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.linkedin.venice.controller.stats.DeferredVersionSwapStats; -import com.linkedin.venice.controllerapi.ControllerClient; -import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.Store; -import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.pushmonitor.ExecutionStatus; @@ -21,7 +18,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -79,17 +75,6 @@ private Set getRegionsForVersionSwap(Map candidateRegio return remainingRegions; } - private String getTargetRegion(Set targetRegions) { - return targetRegions.iterator().next(); - } - - private StoreResponse getStoreForRegion(String clusterName, String targetRegion, String storeName) { - Map controllerClientMap = - veniceParentHelixAdmin.getVeniceHelixAdmin().getControllerClientMap(clusterName); - ControllerClient targetRegionControllerClient = controllerClientMap.get(targetRegion); - return targetRegionControllerClient.getStore(storeName); - } - private boolean didWaitTimeElapseInTargetRegions( Map completionTimes, Set targetRegions, @@ -146,7 +131,7 @@ public void run() { storePushCompletionTimes, targetRegions, store.getTargetSwapRegionWaitTime())) { - LOGGER.info( + LOGGER.debug( "Skipping version swap for store: {} on version: {} as wait time: {} has not passed", storeName, targetVersionNum, @@ -159,34 +144,6 @@ public void run() { veniceParentHelixAdmin.getCurrentVersionsForMultiColos(cluster, storeName); Set remainingRegions = getRegionsForVersionSwap(coloToVersions, targetRegions); - StoreResponse targetRegionStoreResponse = - getStoreForRegion(cluster, getTargetRegion(targetRegions), storeName); - if (targetRegionStoreResponse.isError()) { - LOGGER.warn("Got error when fetching targetRegionStore: {}", targetRegionStoreResponse.getError()); - continue; - } - - StoreInfo targetRegionStore = targetRegionStoreResponse.getStore(); - Optional version = targetRegionStore.getVersion(targetVersionNum); - if (!version.isPresent()) { - LOGGER.warn( - "Unable to find version {} for store: {} in regions: {}", - targetVersionNum, - storeName, - store.getTargetSwapRegion()); - continue; - } - - // Do not perform version swap for davinci stores - // TODO remove this check once DVC delayed ingestion is completed - if (version.get().getIsDavinciHeartbeatReported()) { - LOGGER.info( - "Skipping version swap for store: {} on version: {} as it is davinci", - storeName, - targetVersionNum); - continue; - } - // Check that push is completed in target regions Admin.OfflinePushStatusInfo pushStatusInfo = veniceParentHelixAdmin.getOffLinePushStatus(cluster, kafkaTopicName); @@ -269,7 +226,7 @@ public void run() { store.getTargetSwapRegionWaitTime()); if (!didWaitTimeElapseInTargetRegions) { - LOGGER.info( + LOGGER.debug( "Skipping version swap for store: {} on version: {} as wait time: {} has not passed", storeName, targetVersionNum, diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java index c1720bef6da..4f64d4f1409 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestDeferredVersionSwapService.java @@ -8,13 +8,9 @@ import static org.mockito.Mockito.verify; import com.linkedin.venice.controller.stats.DeferredVersionSwapStats; -import com.linkedin.venice.controllerapi.ControllerClient; -import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.Store; -import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.utils.TestUtils; @@ -117,23 +113,21 @@ private Admin.OfflinePushStatusInfo getOfflinePushStatusInfo( public void testDeferredVersionSwap() throws Exception { Map versions = new HashMap<>(); int targetVersionNum = 3; - int davinciVersionNum = 2; + int currentVersionNum = 2; int completedVersionNum = 1; versions.put(completedVersionNum, VersionStatus.ONLINE); - versions.put(davinciVersionNum, VersionStatus.PUSHED); + versions.put(currentVersionNum, VersionStatus.PUSHED); versions.put(targetVersionNum, VersionStatus.PUSHED); String storeName1 = "testStore"; - String storeName2 = "testStore2"; - String storeName3 = "testStore3"; - String storeName4 = "testStore4"; + String storeName2 = "testStore3"; + String storeName3 = "testStore4"; + String storeName4 = "testStore5"; String storeName5 = "testStore5"; - String storeName6 = "testStore5"; - Store store1 = mockStore(davinciVersionNum, 60, region1, versions, storeName1); - Store store2 = mockStore(completedVersionNum, 60, region1, versions, storeName2); - Store store3 = mockStore(davinciVersionNum, 60, region1, versions, storeName3); - Store store4 = mockStore(davinciVersionNum, 60, region1, versions, storeName4); - Store store5 = mockStore(completedVersionNum, 60, region1, versions, storeName5); - Store store6 = mockStore(davinciVersionNum, 60, region1, versions, storeName6); + Store store1 = mockStore(currentVersionNum, 60, region1, versions, storeName1); + Store store2 = mockStore(currentVersionNum, 60, region1, versions, storeName2); + Store store3 = mockStore(currentVersionNum, 60, region1, versions, storeName3); + Store store4 = mockStore(completedVersionNum, 60, region1, versions, storeName4); + Store store5 = mockStore(currentVersionNum, 60, region1, versions, storeName5); Long time = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC); List storeList = new ArrayList<>(); @@ -142,32 +136,15 @@ public void testDeferredVersionSwap() throws Exception { storeList.add(store3); storeList.add(store4); storeList.add(store5); - storeList.add(store6); doReturn(storeList).when(admin).getAllStores(clusterName); doReturn(3).when(store1).getLargestUsedVersionNumber(); - doReturn(2).when(store2).getLargestUsedVersionNumber(); + doReturn(3).when(store2).getLargestUsedVersionNumber(); doReturn(3).when(store3).getLargestUsedVersionNumber(); - doReturn(3).when(store4).getLargestUsedVersionNumber(); - doReturn(1).when(store5).getLargestUsedVersionNumber(); + doReturn(1).when(store4).getLargestUsedVersionNumber(); doReturn(3).when(store5).getLargestUsedVersionNumber(); - Version davinciVersion = new VersionImpl(storeName2, davinciVersionNum); - Version targetVersion = new VersionImpl(storeName1, targetVersionNum); - davinciVersion.setIsDavinciHeartbeatReported(true); - - ControllerClient controllerClient = mock(ControllerClient.class); VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); - Map controllerClientMap = new HashMap<>(); - controllerClientMap.put(region1, controllerClient); - StoreResponse storeResponse = new StoreResponse(); - StoreInfo storeInfo = new StoreInfo(); - List versionList = new ArrayList<>(); - versionList.add(targetVersion); - versionList.add(davinciVersion); - storeInfo.setVersions(versionList); - storeResponse.setStore(storeInfo); - doReturn(storeResponse).when(controllerClient).getStore(any()); doReturn(veniceHelixAdmin).when(admin).getVeniceHelixAdmin(); HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class); @@ -175,20 +152,15 @@ public void testDeferredVersionSwap() throws Exception { doReturn(repository).when(resources).getStoreMetadataRepository(); doReturn(resources).when(veniceHelixAdmin).getHelixVeniceClusterResources(clusterName); doReturn(store1).when(repository).getStore(storeName1); - doReturn(controllerClientMap).when(veniceHelixAdmin).getControllerClientMap(clusterName); Map coloToVersions = new HashMap<>(); - Map davinciColoToVersions = new HashMap<>(); coloToVersions.put(region1, 3); coloToVersions.put(region2, 2); - davinciColoToVersions.put(region1, 2); - davinciColoToVersions.put(region2, 1); doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName1); - doReturn(davinciColoToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName2); + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName2); doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName3); - doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName4); - doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName6); + doReturn(coloToVersions).when(admin).getCurrentVersionsForMultiColos(clusterName, storeName5); Admin.OfflinePushStatusInfo offlinePushStatusInfoWithWaitTimeElapsed = getOfflinePushStatusInfo( ExecutionStatus.COMPLETED.toString(), @@ -212,12 +184,10 @@ public void testDeferredVersionSwap() throws Exception { time - TimeUnit.MINUTES.toSeconds(30)); String kafkaTopicName1 = Version.composeKafkaTopic(storeName1, targetVersionNum); - String kafkaTopicName2 = Version.composeKafkaTopic(storeName2, davinciVersionNum); - String kafkaTopicName3 = Version.composeKafkaTopic(storeName3, targetVersionNum); - String kafkaTopicName4 = Version.composeKafkaTopic(storeName4, targetVersionNum); - String kafkaTopicName6 = Version.composeKafkaTopic(storeName6, targetVersionNum); + String kafkaTopicName3 = Version.composeKafkaTopic(storeName2, targetVersionNum); + String kafkaTopicName4 = Version.composeKafkaTopic(storeName3, targetVersionNum); + String kafkaTopicName6 = Version.composeKafkaTopic(storeName5, targetVersionNum); doReturn(offlinePushStatusInfoWithWaitTimeElapsed).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName1); - doReturn(offlinePushStatusInfoWithWaitTimeElapsed).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName2); doReturn(offlinePushStatusInfoWithoutWaitTimeElapsed).when(admin) .getOffLinePushStatus(clusterName, kafkaTopicName3); doReturn(offlinePushStatusInfoWithOngoingPush).when(admin).getOffLinePushStatus(clusterName, kafkaTopicName4); @@ -233,20 +203,17 @@ public void testDeferredVersionSwap() throws Exception { // push completed in target region & wait time elapsed verify(admin, atLeast(1)).rollForwardToFutureVersion(clusterName, storeName1, region2); - // davinci store - verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName2, region2); - // push completed in target region & wait time NOT elapsed - verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName3, region2); + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName2, region2); // push not completed in target region - verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName4, region2); + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName3, region2); // push is complete in all regions - verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName5, region2); + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName4, region2); // push failed in non target region - verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName6, region2); + verify(admin, never()).rollForwardToFutureVersion(clusterName, storeName5, region2); }); } @@ -254,10 +221,10 @@ public void testDeferredVersionSwap() throws Exception { public void testDeferredVersionSwapNonTargetRegionStatuses() throws Exception { Map versions = new HashMap<>(); int targetVersionNum = 3; - int davinciVersionNum = 2; + int currentVersionNum = 2; int completedVersionNum = 1; versions.put(completedVersionNum, VersionStatus.ONLINE); - versions.put(davinciVersionNum, VersionStatus.PUSHED); + versions.put(currentVersionNum, VersionStatus.PUSHED); versions.put(targetVersionNum, VersionStatus.PUSHED); String storeName1 = "testStore"; String storeName2 = "testStore2"; @@ -265,12 +232,12 @@ public void testDeferredVersionSwapNonTargetRegionStatuses() throws Exception { String storeName4 = "testStore4"; String storeName5 = "testStore5"; String storeName6 = "testStore6"; - Store store1 = mockStore(davinciVersionNum, 60, region1, versions, storeName1); - Store store2 = mockStore(davinciVersionNum, 60, region1, versions, storeName2); - Store store3 = mockStore(davinciVersionNum, 60, region1, versions, storeName3); - Store store4 = mockStore(davinciVersionNum, 60, region1, versions, storeName4); - Store store5 = mockStore(davinciVersionNum, 60, region1, versions, storeName5); - Store store6 = mockStore(davinciVersionNum, 60, region1 + "," + region2, versions, storeName6); + Store store1 = mockStore(currentVersionNum, 60, region1, versions, storeName1); + Store store2 = mockStore(currentVersionNum, 60, region1, versions, storeName2); + Store store3 = mockStore(currentVersionNum, 60, region1, versions, storeName3); + Store store4 = mockStore(currentVersionNum, 60, region1, versions, storeName4); + Store store5 = mockStore(currentVersionNum, 60, region1, versions, storeName5); + Store store6 = mockStore(currentVersionNum, 60, region1 + "," + region2, versions, storeName6); Long time = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC); List storeList = new ArrayList<>(); @@ -288,21 +255,8 @@ public void testDeferredVersionSwapNonTargetRegionStatuses() throws Exception { doReturn(3).when(store5).getLargestUsedVersionNumber(); doReturn(3).when(store6).getLargestUsedVersionNumber(); - Version targetVersion = new VersionImpl(storeName1, targetVersionNum); - - ControllerClient controllerClient = mock(ControllerClient.class); VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); - Map controllerClientMap = new HashMap<>(); - controllerClientMap.put(region1, controllerClient); - controllerClientMap.put(region2, controllerClient); - StoreResponse storeResponse = new StoreResponse(); - StoreInfo storeInfo = new StoreInfo(); - List versionList = new ArrayList<>(); - versionList.add(targetVersion); - storeInfo.setVersions(versionList); - storeResponse.setStore(storeInfo); - - doReturn(storeResponse).when(controllerClient).getStore(any()); + ; doReturn(veniceHelixAdmin).when(admin).getVeniceHelixAdmin(); HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class); @@ -310,7 +264,6 @@ public void testDeferredVersionSwapNonTargetRegionStatuses() throws Exception { doReturn(repository).when(resources).getStoreMetadataRepository(); doReturn(resources).when(veniceHelixAdmin).getHelixVeniceClusterResources(clusterName); doReturn(store1).when(repository).getStore(storeName1); - doReturn(controllerClientMap).when(veniceHelixAdmin).getControllerClientMap(clusterName); Map coloToVersions = new HashMap<>(); coloToVersions.put(region1, 3);