Skip to content

Commit

Permalink
[controller] Update parent colo version status on killjob (#906)
Browse files Browse the repository at this point in the history
The nuage page shows the version status from parent colo. For killed pushes, the status was not updated on the nuage leading users to think it was successful. This PR update the status for killjob on parent colo. Also fixes list_backup_version admin path.


---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Mar 25, 2024
1 parent 549bffa commit fd69874
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.MultiStoreStatusResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
Expand Down Expand Up @@ -153,7 +154,6 @@ public class TestHybrid {
private static final Logger LOGGER = LogManager.getLogger(TestHybrid.class);
public static final int STREAMING_RECORD_SIZE = 1024;
private static final long MIN_COMPACTION_LAG = 24 * Time.MS_PER_HOUR;
private static final long MAX_COMPACTION_LAG = 2 * MIN_COMPACTION_LAG;

/**
* IMPORTANT NOTE: if you use this sharedVenice cluster, please do not close it. The {@link #cleanUp()} function
Expand Down Expand Up @@ -246,6 +246,17 @@ public void testHybridInitializationOnMultiColo() throws IOException {
Assert.assertFalse(jobStatus.isError(), "Error in getting JobStatusResponse: " + jobStatus.getError());
assertEquals(jobStatus.getStatus(), "COMPLETED");
});
vcr = controllerClient.emptyPush(storeName, Utils.getUniqueString("empty-hybrid-push1"), 1L);
VersionCreationResponse finalVcr = vcr;
TestUtils.waitForNonDeterministicAssertion(100, TimeUnit.SECONDS, true, () -> {
// Now the store should have version 2
JobStatusQueryResponse jobStatus =
controllerClient.queryJobStatus(Version.composeKafkaTopic(storeName, finalVcr.getVersion()));
Assert.assertFalse(jobStatus.isError(), "Error in getting JobStatusResponse: " + jobStatus.getError());
assertEquals(jobStatus.getStatus(), "COMPLETED");
});
MultiStoreStatusResponse response = controllerClient.getBackupVersions(venice.getClusterName(), storeName);
Assert.assertEquals(response.getStoreStatusMap().get("dc-0"), "1");

// And real-time topic should exist now.
assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3940,7 +3940,14 @@ public void killOfflinePush(String clusterName, String kafkaTopic, boolean isFor
}
}

// TODO: Set parent controller's version status (to ERROR, most likely?)
HelixVeniceClusterResources resources = getVeniceHelixAdmin().getHelixVeniceClusterResources(clusterName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
ReadWriteStoreRepository repository = resources.getStoreMetadataRepository();
Store parentStore = repository.getStore(storeName);
int version = Version.parseVersionFromKafkaTopicName(kafkaTopic);
parentStore.updateVersionStatus(version, VersionStatus.KILLED);
repository.updateStore(parentStore);
}

KillOfflinePushJob killJob = (KillOfflinePushJob) AdminMessageType.KILL_OFFLINE_PUSH_JOB.getNewInstance();
killJob.clusterName = clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public boolean startInner() throws Exception {
httpService.get(CLUSTER_HEALTH_STORES.getPath(), storesRoutes.getAllStoresStatuses(admin));
httpService.get(STORE.getPath(), storesRoutes.getStore(admin));
httpService.get(FUTURE_VERSION.getPath(), storesRoutes.getFutureVersion(admin));
httpService.get(BACKUP_VERSION.getPath(), storesRoutes.getFutureVersion(admin));
httpService.get(BACKUP_VERSION.getPath(), storesRoutes.getBackupVersion(admin));
httpService.post(SET_TOPIC_COMPACTION.getPath(), storesRoutes.setTopicCompaction(admin));

httpService.post(UPDATE_CLUSTER_CONFIG.getPath(), clusterRoutes.updateClusterConfig(admin));
Expand Down

0 comments on commit fd69874

Please sign in to comment.