Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CONCURRENT_PUSH_DETECTION_STRATEGY;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_DELETION_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_MIN_CLEANUP_DELAY_MS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_REPLICA_REDUCTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_RETENTION_BASED_CLEANUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE;
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.linkedin.venice.controller.StoreBackupVersionCleanupService;
Expand All @@ -20,6 +26,7 @@
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
Expand All @@ -28,6 +35,8 @@
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -41,7 +50,7 @@ public class TestStoreBackupVersionDeletion extends AbstractMultiRegionTest {

@Override
protected int getNumberOfRegions() {
return 1;
return 2;
}

@Override
Expand All @@ -65,6 +74,14 @@ protected Properties getExtraControllerProperties() {
controllerProps.put(CONTROLLER_BACKUP_VERSION_MIN_CLEANUP_DELAY_MS, 10);
controllerProps.put(CONTROLLER_BACKUP_VERSION_RETENTION_BASED_CLEANUP_ENABLED, "true");
controllerProps.put(CONTROLLER_BACKUP_VERSION_REPLICA_REDUCTION_ENABLED, "true");
// Required for the prior-current preservation test which uses a target-region push w/ deferred swap.
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, 100);
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED, true);
// Use parent-version-status-based concurrent-push detection so a KILLED prior version
// releases the future-version guard at VeniceParentHelixAdmin:1964. The default TOPIC_BASED_ONLY
// strategy always treats target-region-deferred-swap topics as "in flight" regardless of KILLED
// status (VeniceParentHelixAdmin:1481-1487), which would block the v3 push in this scenario.
controllerProps.put(CONCURRENT_PUSH_DETECTION_STRATEGY, "PARENT_VERSION_STATUS_ONLY");
return controllerProps;
}

Expand Down Expand Up @@ -138,6 +155,149 @@ private static String describeStore(Store store) {
+ store.getVersions().stream().map(v -> "v" + v.getNumber()).collect(java.util.stream.Collectors.joining(","));
}

/**
* Reproduces VENG-12676 end-to-end: push v1, push v2 with target-region={@code dc-0} and deferred
* swap, kill v2 before dc-1 swaps, push v3, verify dc-1 cleanup preserves v1 (prior current) and
* reaps the lingering v2. dc-1's child kill handler hits the bootstrap-completed early-return at
* VeniceHelixAdmin.killOfflinePush:8649 and leaves v2 at PUSHED — the upstream bug that produces
* the broken state. If that bug is fixed in a separate change, the v2-still-PUSHED-after-kill
* assertion below will fail and this test will need to inject the lingering state differently.
Comment thread
misyel marked this conversation as resolved.
*/
@Test(timeOut = TEST_TIMEOUT * 2)
public void testCleanupPreservesPriorCurrentVersionAcrossDeferredSwapKill() throws IOException {
final String targetRegion = "dc-0";
final String testRegion = "dc-1";
int targetIdx = -1;
int testIdx = -1;
for (int i = 0; i < childDatacenters.size(); i++) {
if (targetRegion.equals(childDatacenters.get(i).getRegionName())) {
targetIdx = i;
} else if (testRegion.equals(childDatacenters.get(i).getRegionName())) {
testIdx = i;
}
}
Assert.assertTrue(targetIdx >= 0 && testIdx >= 0, "Expected dc-0 and dc-1 in childDatacenters");
VeniceHelixAdmin testRegionAdmin =
(VeniceHelixAdmin) childDatacenters.get(testIdx).getControllers().values().iterator().next().getVeniceAdmin();

File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("store");
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
String keySchemaStr = "\"string\"";
UpdateStoreQueryParams storeParms =
new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true).setTargetRegionSwapWaitTime(60);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();

try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerURLs)) {
createStoreForJob(CLUSTER_NAME, keySchemaStr, NAME_RECORD_V3_SCHEMA.toString(), props, storeParms).close();

IntegrationTestPushUtils.runVPJ(props);
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
Store store = testRegionAdmin.getStore(CLUSTER_NAME, storeName);
assertEquals(store.getCurrentVersion(), 1, "v1 should be current in " + testRegion);
});

Properties v2Props = (Properties) props.clone();
v2Props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
v2Props.put(TARGETED_REGION_PUSH_LIST, targetRegion);
AtomicReference<Throwable> v2VpjError = new AtomicReference<>();
Thread v2Thread = new Thread(() -> {
try {
IntegrationTestPushUtils.runVPJ(v2Props);
} catch (Throwable t) {
v2VpjError.set(t);
}
});
Comment thread
misyel marked this conversation as resolved.
v2Thread.setDaemon(true);
v2Thread.start();

try {
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
Store store = testRegionAdmin.getStore(CLUSTER_NAME, storeName);
Version v2 = store.getVersion(2);
assertNotNull(v2, "v2 should exist in " + testRegion + ". " + describeStore(store));
Assert.assertEquals(
v2.getStatus(),
VersionStatus.PUSHED,
"v2 should be PUSHED (deferred swap) in " + testRegion + ". " + describeStore(store));
assertEquals(
store.getCurrentVersion(),
1,
"current should still be v1 in " + testRegion + " (deferred). " + describeStore(store));
});

parentControllerClient.killOfflinePushJob(Version.composeKafkaTopic(storeName, 2));

// Parent honors KILL; waiting for KILLED at parent releases the future-version guard at
// VeniceParentHelixAdmin:1964 so the subsequent v3 push can proceed.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
com.linkedin.venice.meta.StoreInfo parentStore = parentControllerClient.getStore(storeName).getStore();
Assert.assertEquals(parentStore.getVersion(2).get().getStatus(), VersionStatus.KILLED);
Comment thread
misyel marked this conversation as resolved.
Outdated
});

// Child kill no-op (the upstream bug): v2 stays PUSHED in dc-1 because the resource
// already finished bootstrapping. If this fails, the kill handler has been fixed and the
// test needs to inject the lingering state differently.
TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> {
Store store = testRegionAdmin.getStore(CLUSTER_NAME, storeName);
Version v2 = store.getVersion(2);
assertNotNull(v2, "v2 should still exist in " + testRegion + " after kill. " + describeStore(store));
Assert.assertEquals(
v2.getStatus(),
VersionStatus.PUSHED,
"v2 should remain PUSHED in " + testRegion + " after kill. " + describeStore(store));
});
} finally {
v2Thread.join(5_000);
if (v2Thread.isAlive()) {
v2Thread.interrupt();
v2Thread.join(5_000);
}
// v2 push was killed mid-flight, so the VPJ thread is expected to surface an error.
// Asserting non-null guards against a silent success that would mask kill regressions.
Assert.assertNotNull(
v2VpjError.get(),
"v2 VPJ should have failed due to the parent kill, but completed without error");
}
Comment thread
misyel marked this conversation as resolved.

IntegrationTestPushUtils.runVPJ(props);
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
Store store = testRegionAdmin.getStore(CLUSTER_NAME, storeName);
Version v3 = store.getVersion(3);
assertNotNull(v3, "v3 should exist in " + testRegion + ". " + describeStore(store));
assertEquals(
store.getCurrentVersion(),
3,
"v3 should be current in " + testRegion + ". " + describeStore(store));
// v2 never became current in dc-1, so v3.previousCurrentVersion auto-stamps to 1.
assertEquals(
v3.getPreviousCurrentVersion(),
1,
"v3.previousCurrentVersion should be 1 in " + testRegion + ". " + describeStore(store));
});

TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
Store store = testRegionAdmin.getStore(CLUSTER_NAME, storeName);
assertNotNull(
store.getVersion(1),
"v1 (prior current) must be preserved in " + testRegion + ". " + describeStore(store));
assertNull(
store.getVersion(2),
"v2 (stale PUSHED lingering) should be cleaned up in " + testRegion + ". " + describeStore(store));
assertNotNull(
store.getVersion(3),
"v3 (current) must be preserved in " + testRegion + ". " + describeStore(store));
assertEquals(store.getCurrentVersion(), 3, "current should still be v3. " + describeStore(store));
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testBackupVersionReplicaReduction() throws IOException {
File inputDir = getTempDataDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,20 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
if (isCurrentVersionRepushed) { // keep the oldest
readyToBeRemovedVersions.remove(readyToBeRemovedVersions.size() - 1);
} else {
readyToBeRemovedVersions.remove(0); // keep the newest version
// Prefer keeping the prior current; fall back to keep-newest when it's unset (pre-v40
// stores or rollback paths that bypass the auto-stamp in ZKStore.setCurrentVersion).
Version currentVersionMeta = store.getVersion(currentVersion);
int priorCurrentVersion =
(currentVersionMeta != null) ? currentVersionMeta.getPreviousCurrentVersion() : NON_EXISTING_VERSION;
if (readyToBeRemovedVersions.removeIf(v -> v.getNumber() == priorCurrentVersion)) {
LOGGER.info(
"Preserving prior current version: {} of store: {} in cluster: {} from backup cleanup",
priorCurrentVersion,
store.getName(),
clusterName);
} else {
readyToBeRemovedVersions.remove(0);
}
}
if (readyToBeRemovedVersions.isEmpty()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,95 @@ public void testCleanupBackupVersion_StoreQualifiedButOnlyHasOneVersion() {
Assert.assertFalse(service.cleanupBackupVersion(storeWithOneVersion, CLUSTER_NAME));
}

/**
* Helper that mocks a store where {@code currentVersion} has its {@code previousCurrentVersion}
* field set, simulating the auto-stamp performed by ZKStore.setCurrentVersion at swap time.
*/
private Store mockStoreWithPriorCurrent(
long backupVersionRetentionMs,
long latestVersionPromoteToCurrentTimestamp,
Map<Integer, VersionStatus> versions,
int currentVersion,
int priorCurrentVersion) {
Store store = mockStore(backupVersionRetentionMs, latestVersionPromoteToCurrentTimestamp, versions, currentVersion);
Version current = store.getVersion(currentVersion);
if (current != null) {
doReturn(priorCurrentVersion).when(current).getPreviousCurrentVersion();
}
return store;
}

private static Map<Integer, VersionStatus> versionsMap(Object... versionStatusPairs) {
Map<Integer, VersionStatus> m = new HashMap<>();
for (int i = 0; i < versionStatusPairs.length; i += 2) {
m.put((Integer) versionStatusPairs[i], (VersionStatus) versionStatusPairs[i + 1]);
}
return m;
}

private static Set<Integer> versionSet(int... versions) {
Set<Integer> s = new HashSet<>();
for (int v: versions) {
s.add(v);
}
return s;
}

@org.testng.annotations.DataProvider(name = "priorCurrentPreservationParams")
public Object[][] priorCurrentPreservationParams() {
// { versions, currentVersion, priorCurrentVersion, expectedDeletedVersions, description }
return new Object[][] {
{ versionsMap(1, VersionStatus.ONLINE, 2, VersionStatus.PUSHED, 3, VersionStatus.ONLINE), 3, 1, versionSet(2),
"priorCurrent v1 preserved, stale PUSHED v2 deleted" },
{ versionsMap(
1,
VersionStatus.ONLINE,
2,
VersionStatus.ONLINE,
3,
VersionStatus.PUSHED,
4,
VersionStatus.ONLINE), 4, 2, versionSet(1, 3),
"longer history bug: priorCurrent v2 preserved, stale PUSHED v3 cleaned" },
{ versionsMap(
1,
VersionStatus.ONLINE,
2,
VersionStatus.ONLINE,
3,
VersionStatus.ONLINE,
4,
VersionStatus.ONLINE), 4, 3, versionSet(1, 2), "sequential pushes match legacy keep-newest" },
{ versionsMap(1, VersionStatus.ONLINE, 2, VersionStatus.ONLINE, 3, VersionStatus.ONLINE), 3,
Store.NON_EXISTING_VERSION, versionSet(1),
"priorCurrent unset (Store.NON_EXISTING_VERSION=0) falls back to keep-newest" }, };
}

@Test(dataProvider = "priorCurrentPreservationParams")
public void testCleanupBackupVersion_PriorCurrentPreservation(
Map<Integer, VersionStatus> versions,
int currentVersion,
int priorCurrentVersion,
Set<Integer> expectedDeletedVersions,
String description) {
Store store = mockStoreWithPriorCurrent(
-1,
mockTime.getMilliseconds() - DEFAULT_RETENTION_MS * 2,
versions,
currentVersion,
priorCurrentVersion);

Assert.assertTrue(service.cleanupBackupVersion(store, CLUSTER_NAME), description);

for (Integer version: versions.keySet()) {
if (expectedDeletedVersions.contains(version)) {
verify(admin, atLeast(1)).deleteOldVersionInStore(CLUSTER_NAME, store.getName(), version);
} else {
verify(admin, never()).deleteOldVersionInStore(CLUSTER_NAME, store.getName(), version);
}
}
}

@Test
public void testCleanupBackupVersion_StoreQualifiedWithOneRemovableVersion() {
Map<Integer, VersionStatus> versions = new HashMap<>();
Expand Down