Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4516,6 +4516,28 @@ private OfflinePushStatusInfo getOffLineJobStatus(
Store parentStore = repository.getStore(storeName);
Version version = parentStore.getVersion(versionNum);

// Version may be absent between two consecutive VPJ polls (retention eviction, supersession,
// controller leadership change, or manual cleanup). The protocol already defines terminal
// values for this state — return one with HTTP 200 so VPJ exits its poll loop, rather than
// letting an unguarded deref below escape as HTTP 500.
if (version == null) {
ExecutionStatus terminalStatus = versionNum <= parentStore.getLargestUsedVersionNumber()
? ExecutionStatus.ARCHIVED

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For archived status, will VPJ stop polling and mark as failed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, VPJ will fail the push when it sees ARCHIVED

: ExecutionStatus.NOT_CREATED;
Comment thread
jingy-li marked this conversation as resolved.
Outdated
LOGGER.info(
"Version {} for store {} no longer present in parent store metadata; returning {}",
versionNum,
storeName,
terminalStatus);
return new OfflinePushStatusInfo(
terminalStatus,
null,
extraInfo,
"Parent version " + versionNum + " no longer present in store metadata",
extraDetails,
extraInfoUpdateTimestamp);
}

// Check if push is in a terminal status in target regions for pushes using deferred swap and try
// updating the parent status. Parent status should only be updated if it is currently in a STARTED state to avoid
// multiple duplicate updates as vpj will keep polling until all regions are complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,58 @@ public void testKilledVersionExecutionStatus() {
assertEquals(offlineJobStatus.getExecutionStatus(), ExecutionStatus.ERROR);
}

@Test
public void testAbsentVersionReturnsArchived() {
// Version is null (e.g. pruned by retention/supersession) and versionNum <= largestUsedVersionNumber,
// meaning the version existed at some point. Expect ARCHIVED instead of NPE/500.
Map<ExecutionStatus, ControllerClient> clientMap = getMockJobStatusQueryClient();
Map<String, ControllerClient> controllerClients = new HashMap<>();
controllerClients.put("cluster", clientMap.get(ExecutionStatus.STARTED));

Store store = mock(Store.class);
doReturn(false).when(store).isIncrementalPushEnabled();
doReturn(store).when(internalAdmin).getStore(anyString(), anyString());
doReturn(null).when(store).getVersion(anyInt());
doReturn(5).when(store).getLargestUsedVersionNumber();

HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
doReturn(mock(ClusterLockManager.class)).when(resources).getClusterLockManager();
doReturn(resources).when(internalAdmin).getHelixVeniceClusterResources(anyString());
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
doReturn(repository).when(resources).getStoreMetadataRepository();
doReturn(store).when(repository).getStore(anyString());

Admin.OfflinePushStatusInfo offlineJobStatus =
parentAdmin.getOffLineJobStatus("IGNORED", "topic1_v3", controllerClients);
assertEquals(offlineJobStatus.getExecutionStatus(), ExecutionStatus.ARCHIVED);
}

@Test
public void testAbsentVersionReturnsNotCreated() {
// Version is null and versionNum > largestUsedVersionNumber, meaning the requested version
// was never created. Expect NOT_CREATED instead of NPE/500.
Map<ExecutionStatus, ControllerClient> clientMap = getMockJobStatusQueryClient();
Map<String, ControllerClient> controllerClients = new HashMap<>();
controllerClients.put("cluster", clientMap.get(ExecutionStatus.STARTED));

Store store = mock(Store.class);
doReturn(false).when(store).isIncrementalPushEnabled();
doReturn(store).when(internalAdmin).getStore(anyString(), anyString());
doReturn(null).when(store).getVersion(anyInt());
doReturn(2).when(store).getLargestUsedVersionNumber();

HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
doReturn(mock(ClusterLockManager.class)).when(resources).getClusterLockManager();
doReturn(resources).when(internalAdmin).getHelixVeniceClusterResources(anyString());
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
doReturn(repository).when(resources).getStoreMetadataRepository();
doReturn(store).when(repository).getStore(anyString());

Admin.OfflinePushStatusInfo offlineJobStatus =
parentAdmin.getOffLineJobStatus("IGNORED", "topic1_v9", controllerClients);
assertEquals(offlineJobStatus.getExecutionStatus(), ExecutionStatus.NOT_CREATED);
}

@Test
public void testUpdateStore() {
String storeName = Utils.getUniqueString("testUpdateStore");
Expand Down