Skip to content

Commit

Permalink
add delayed ingestion in dvc for target region push
Browse files Browse the repository at this point in the history
  • Loading branch information
misyel committed Feb 7, 2025
1 parent 2a34ca2 commit a7dbb18
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ConcurrentRef;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.RegionUtils;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -240,9 +243,24 @@ synchronized void trySubscribeDaVinciFutureVersion() {
} else {
return;
}
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));

Store store = backend.getStoreRepository().getStoreOrThrow(storeName);
Set<String> targetRegions = RegionUtils.parseRegionsFilterList(store.getTargetSwapRegion());
String currentRegion = backend.getConfigLoader().getVeniceServerConfig().getRegionName();
boolean isTargetRegionEnabled = !StringUtils.isEmpty(store.getTargetSwapRegion());
boolean startIngestionInNonTargetRegion =
isTargetRegionEnabled && targetVersion.getStatus() == VersionStatus.ONLINE;

// Subscribe to the future version if:
// 1. Target region push with delayed ingestion is not enabled
// 2. Target region push with delayed ingestion is enabled and the current region is a target region
// 3. Target region push with delayed ingestion is enabled and the current region is a non target region
// and the wait time has elapsed. The wait time has elapsed when the version status is marked ONLINE
if (targetRegions.contains(currentRegion) || startIngestionInNonTargetRegion || !isTargetRegionEnabled) {
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
}
}

/**
Expand Down Expand Up @@ -373,4 +391,8 @@ private void swapCurrentVersion() {
public StoreDeserializerCache getStoreDeserializerCache() {
return storeDeserializerCache;
}

public String getStoreName() {
return storeName;
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.LOCAL_REGION_NAME;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.StoreInfo;
Expand All @@ -19,12 +29,16 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -40,6 +54,9 @@ public class TestDeferredVersionSwap {
private static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);

private static final int TEST_TIMEOUT = 120_000;
private static final Logger LOGGER = LogManager.getLogger(TestDeferredVersionSwap.class);

@BeforeClass
public void setUp() {
Properties controllerProps = new Properties();
Expand Down Expand Up @@ -68,7 +85,7 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
@Test(timeOut = TEST_TIMEOUT)
public void testDeferredVersionSwap() throws IOException {
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
Expand Down Expand Up @@ -131,4 +148,140 @@ public void testDeferredVersionSwap() throws IOException {
}
}

@Test(timeOut = TEST_TIMEOUT * 2)
public void testDvcDelayedIngestionWithTargetRegion() throws Exception {
// Setup job properties
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true);
storeParms.setTargetRegionSwapWaitTime(1);
storeParms.setTargetRegionSwap(TARGET_REGION);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
String keySchemaStr = "\"int\"";
String valueSchemaStr = "\"int\"";

// Create store + start a normal push
int keyCount = 100;
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir, keyCount);
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("testDvcDelayedIngestionWithTargetRegion");
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, valueSchemaStr, props, storeParms).close();
LOGGER.info("DvcDeferredVersionSwap starting normal push job");
TestWriteUtils.runPushJob("Test push job", props);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in all regions
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 1);
});
});
}

// Create dvc client in target region
List<VeniceMultiClusterWrapper> childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
VeniceClusterWrapper cluster1 = childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]);
VeniceProperties backendConfig = DaVinciTestContext.getDaVinciPropertyBuilder(cluster1.getZk().getAddress())
.put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(LOCAL_REGION_NAME, TARGET_REGION)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
DaVinciClient<Object, Object> client1 =
ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster1, new DaVinciConfig(), backendConfig);
client1.subscribeAll().get();

// Check that v1 is ingested
for (int i = 1; i <= keyCount; i++) {
assertNotNull(client1.get(i).get());
}

// Do another push with target region enabled
int keyCount2 = 200;
File inputDir2 = getTempDataDirectory();
String inputDirPath2 = "file://" + inputDir2.getAbsolutePath();
TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir2, keyCount2);
Properties props2 =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath2, storeName);
try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
props2.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
LOGGER.info("DvcDeferredVersionSwap starting target region push job");
TestWriteUtils.runPushJob("Test push job", props2);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 2),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in the target region
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(TARGET_REGION)) {
Assert.assertEquals((int) version, 2);
} else {
Assert.assertEquals((int) version, 1);
}
});
});

// Data should be automatically ingested in target region for dvc
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNotNull(client1.get(i).get());
}
});

// Close dvc client in target region
client1.close();

// Create dvc client in non target region
VeniceClusterWrapper cluster2 = childDatacenters.get(1).getClusters().get(CLUSTER_NAMES[0]);
VeniceProperties backendConfig2 = DaVinciTestContext.getDaVinciPropertyBuilder(cluster2.getZk().getAddress())
.put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(LOCAL_REGION_NAME, "dc-1")
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
DaVinciClient<Object, Object> client2 =
ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster2, new DaVinciConfig(), backendConfig2);
client2.subscribeAll().get();

// Check that v2 is not ingested
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNull(client2.get(i).get());
}
});

// Version should be swapped in all regions
LOGGER.info("DvcDeferredVersionSwap check that target region push is complete");
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 2);
});
});

// Check that v2 is ingested in dvc non target region
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNotNull(client2.get(i).get());
}
});

client2.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
Expand All @@ -21,7 +18,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -79,17 +75,6 @@ private Set<String> getRegionsForVersionSwap(Map<String, Integer> candidateRegio
return remainingRegions;
}

private String getTargetRegion(Set<String> targetRegions) {
return targetRegions.iterator().next();
}

private StoreResponse getStoreForRegion(String clusterName, String targetRegion, String storeName) {
Map<String, ControllerClient> controllerClientMap =
veniceParentHelixAdmin.getVeniceHelixAdmin().getControllerClientMap(clusterName);
ControllerClient targetRegionControllerClient = controllerClientMap.get(targetRegion);
return targetRegionControllerClient.getStore(storeName);
}

private boolean didWaitTimeElapseInTargetRegions(
Map<String, Long> completionTimes,
Set<String> targetRegions,
Expand Down Expand Up @@ -146,7 +131,7 @@ public void run() {
storePushCompletionTimes,
targetRegions,
store.getTargetSwapRegionWaitTime())) {
LOGGER.info(
LOGGER.debug(
"Skipping version swap for store: {} on version: {} as wait time: {} has not passed",
storeName,
targetVersionNum,
Expand All @@ -159,34 +144,6 @@ public void run() {
veniceParentHelixAdmin.getCurrentVersionsForMultiColos(cluster, storeName);
Set<String> remainingRegions = getRegionsForVersionSwap(coloToVersions, targetRegions);

StoreResponse targetRegionStoreResponse =
getStoreForRegion(cluster, getTargetRegion(targetRegions), storeName);
if (targetRegionStoreResponse.isError()) {
LOGGER.warn("Got error when fetching targetRegionStore: {}", targetRegionStoreResponse.getError());
continue;
}

StoreInfo targetRegionStore = targetRegionStoreResponse.getStore();
Optional<Version> version = targetRegionStore.getVersion(targetVersionNum);
if (!version.isPresent()) {
LOGGER.warn(
"Unable to find version {} for store: {} in regions: {}",
targetVersionNum,
storeName,
store.getTargetSwapRegion());
continue;
}

// Do not perform version swap for davinci stores
// TODO remove this check once DVC delayed ingestion is completed
if (version.get().getIsDavinciHeartbeatReported()) {
LOGGER.info(
"Skipping version swap for store: {} on version: {} as it is davinci",
storeName,
targetVersionNum);
continue;
}

// Check that push is completed in target regions
Admin.OfflinePushStatusInfo pushStatusInfo =
veniceParentHelixAdmin.getOffLinePushStatus(cluster, kafkaTopicName);
Expand Down Expand Up @@ -269,7 +226,7 @@ public void run() {
store.getTargetSwapRegionWaitTime());

if (!didWaitTimeElapseInTargetRegions) {
LOGGER.info(
LOGGER.debug(
"Skipping version swap for store: {} on version: {} as wait time: {} has not passed",
storeName,
targetVersionNum,
Expand Down
Loading

0 comments on commit a7dbb18

Please sign in to comment.