Skip to content

Commit

Permalink
[controller] Delete leaking backup versions from repush (#1084)
Browse files Browse the repository at this point in the history
If there are still leaking versions due to consecutive repush and intermittent failed push, there could be versions with repushSourceVersion which does not match the current version. This PR will delete those versions after backup retention period.
  • Loading branch information
majisourav99 authored Aug 2, 2024
1 parent 0b6b41d commit 0f250df
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
: v.getNumber() < currentVersion)
.collect(Collectors.toList());

// If there are still leaking versions due to consecutive repushes with some version failing in other fabric,
// there could be versions with repushSourceVersion does not match current version, delete them after backup
// retention period.
if (readyToBeRemovedVersions.isEmpty()) {
for (Version version: versions) {
if (version.getNumber() < currentVersion && store.getLatestVersionPromoteToCurrentTimestamp()
+ defaultBackupVersionRetentionMs < time.getMilliseconds()) {
readyToBeRemovedVersions.add(version);
}
}
}
if (readyToBeRemovedVersions.isEmpty()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ private Store mockStore(
versionList.add(v);
});
doReturn(versionList).when(store).getVersions();
doReturn(versionList.get(versionList.size() - 1)).when(store).getVersion(currentVersion);
for (int i = 0; i < versionList.size(); i++) {
doReturn(versionList.get(i)).when(store).getVersion(i + 1);
}
doReturn(versionList.get(versionList.size() - 1)).when(store).getVersionOrThrow(currentVersion);
return store;
}
Expand Down Expand Up @@ -201,6 +203,44 @@ public void testCleanupBackupVersion() {
Assert.assertFalse(service.cleanupBackupVersion(storeWithRollback, clusterName));
}

@Test
public void testCleanupBackupVersionRepush() {
VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);
VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class);
long defaultRetentionMs = TimeUnit.DAYS.toMillis(7);
doReturn(defaultRetentionMs).when(config).getBackupVersionDefaultRetentionMs();
VeniceControllerClusterConfig controllerConfig = mock(VeniceControllerClusterConfig.class);
doReturn(controllerConfig).when(config).getControllerConfig(anyString());
doReturn(mockClusterResource).when(admin).getHelixVeniceClusterResources(anyString());
doReturn(clusterManager).when(mockClusterResource).getRoutersClusterManager();
StoreBackupVersionCleanupService service =
new StoreBackupVersionCleanupService(admin, config, mock(MetricsRepository.class));

String clusterName = "test_cluster";
// Store is not qualified because of short life time of backup version
Map<Integer, VersionStatus> versions = new HashMap<>();
versions.put(1, VersionStatus.ONLINE);
versions.put(2, VersionStatus.ONLINE);
Store storeWithFreshBackupVersion = mockStore(-1, System.currentTimeMillis(), versions, 2);
storeWithFreshBackupVersion.getVersion(2).setRepushSourceVersion(1);
Assert.assertFalse(service.cleanupBackupVersion(storeWithFreshBackupVersion, clusterName));

versions.clear();
versions.put(1, VersionStatus.ONLINE);
versions.put(2, VersionStatus.ONLINE);
versions.put(3, VersionStatus.ONLINE);
Store storeWithRollback = mockStore(-1, System.currentTimeMillis() - defaultRetentionMs * 2, versions, 3);
Version version = storeWithRollback.getVersion(3);
doReturn(2).when(version).getRepushSourceVersion();

// should delete version 2 as 1 is the true backup
Assert.assertTrue(service.cleanupBackupVersion(storeWithRollback, clusterName));
TestUtils.waitForNonDeterministicAssertion(
1,
TimeUnit.SECONDS,
() -> verify(admin, atLeast(1)).deleteOldVersionInStore(clusterName, storeWithRollback.getName(), 2));
}

@Test
public void testMetadataBasedCleanupBackupVersion() throws IOException {
VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);
Expand Down

0 comments on commit 0f250df

Please sign in to comment.