diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java index 961a36644e6..68380ff1d1c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java @@ -276,6 +276,17 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) { : v.getNumber() < currentVersion) .collect(Collectors.toList()); + // If there are still leaking versions due to consecutive repushes with some version failing in other fabric, + // there could be versions with repushSourceVersion does not match current version, delete them after backup + // retention period. + if (readyToBeRemovedVersions.isEmpty()) { + for (Version version: versions) { + if (version.getNumber() < currentVersion && store.getLatestVersionPromoteToCurrentTimestamp() + + defaultBackupVersionRetentionMs < time.getMilliseconds()) { + readyToBeRemovedVersions.add(version); + } + } + } if (readyToBeRemovedVersions.isEmpty()) { return false; } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java index 54c22319a48..04d8b530d07 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java @@ -81,7 +81,9 @@ private Store mockStore( versionList.add(v); }); doReturn(versionList).when(store).getVersions(); - doReturn(versionList.get(versionList.size() - 1)).when(store).getVersion(currentVersion); + for (int i = 0; i < versionList.size(); i++) { + doReturn(versionList.get(i)).when(store).getVersion(i + 1); + } doReturn(versionList.get(versionList.size() - 1)).when(store).getVersionOrThrow(currentVersion); return store; } @@ -201,6 +203,44 @@ public void testCleanupBackupVersion() { Assert.assertFalse(service.cleanupBackupVersion(storeWithRollback, clusterName)); } + @Test + public void testCleanupBackupVersionRepush() { + VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class); + VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class); + long defaultRetentionMs = TimeUnit.DAYS.toMillis(7); + doReturn(defaultRetentionMs).when(config).getBackupVersionDefaultRetentionMs(); + VeniceControllerClusterConfig controllerConfig = mock(VeniceControllerClusterConfig.class); + doReturn(controllerConfig).when(config).getControllerConfig(anyString()); + doReturn(mockClusterResource).when(admin).getHelixVeniceClusterResources(anyString()); + doReturn(clusterManager).when(mockClusterResource).getRoutersClusterManager(); + StoreBackupVersionCleanupService service = + new StoreBackupVersionCleanupService(admin, config, mock(MetricsRepository.class)); + + String clusterName = "test_cluster"; + // Store is not qualified because of short life time of backup version + Map versions = new HashMap<>(); + versions.put(1, VersionStatus.ONLINE); + versions.put(2, VersionStatus.ONLINE); + Store storeWithFreshBackupVersion = mockStore(-1, System.currentTimeMillis(), versions, 2); + storeWithFreshBackupVersion.getVersion(2).setRepushSourceVersion(1); + Assert.assertFalse(service.cleanupBackupVersion(storeWithFreshBackupVersion, clusterName)); + + versions.clear(); + versions.put(1, VersionStatus.ONLINE); + versions.put(2, VersionStatus.ONLINE); + versions.put(3, VersionStatus.ONLINE); + Store storeWithRollback = mockStore(-1, System.currentTimeMillis() - defaultRetentionMs * 2, versions, 3); + Version version = storeWithRollback.getVersion(3); + doReturn(2).when(version).getRepushSourceVersion(); + + // should delete version 2 as 1 is the true backup + Assert.assertTrue(service.cleanupBackupVersion(storeWithRollback, clusterName)); + TestUtils.waitForNonDeterministicAssertion( + 1, + TimeUnit.SECONDS, + () -> verify(admin, atLeast(1)).deleteOldVersionInStore(clusterName, storeWithRollback.getName(), 2)); + } + @Test public void testMetadataBasedCleanupBackupVersion() throws IOException { VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);