Skip to content

Commit 9750b52

Browse files
committed
[controller][vpj][protocol] Propagate KILL job trigger and details from VPJ
to servers Add support for propagating the real KILL caller information (trigger reason + details) from VenicePushJob through Controllers to Servers, enabling better observability and debugging of push job terminations. Key changes: - Add PushJobKillTrigger enum with trigger types: USER_REQUEST, VERSION_RETIREMENT, SLA_VIOLATION, PREEMPTED_BY_FULL_PUSH, INGESTION_FAILURE, VERSION_CREATION_FAILURE, PUSH_JOB_FAILED, LINGERING_VERSION_TOPIC, UNKNOWN - Add AdminOperation v96 schema with trigger/details fields in KillOfflinePushJob - Add ParticipantMessageValue v2 schema with trigger/details fields in KillPushJob - Update Admin.killOfflinePush() and StoreCleaner.deleteOneStoreVersion() interfaces - Propagate trigger/details through VeniceHelixAdmin and VeniceParentHelixAdmin - Add PUSH_JOB_KILL_TRIGGER and PUSH_JOB_KILL_DETAILS controller API params - VPJ uses SLA_VIOLATION for timeout kills, PUSH_JOB_FAILED for error cleanup - Add SLA_TIMEOUT error type
1 parent 90250e9 commit 9750b52

File tree

28 files changed

+1722
-66
lines changed

28 files changed

+1722
-66
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ subprojects {
328328
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
329329
def versionOverrides = [
330330
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v39', PathValidation.DIRECTORY),
331-
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v94', PathValidation.DIRECTORY)
331+
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v96', PathValidation.DIRECTORY)
332332
]
333333

334334
def schemaDirs = [sourceDir]

clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import com.linkedin.venice.meta.Version;
134134
import com.linkedin.venice.meta.VersionStatus;
135135
import com.linkedin.venice.meta.ViewConfig;
136+
import com.linkedin.venice.participant.protocol.enums.PushJobKillTrigger;
136137
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
137138
import com.linkedin.venice.partitioner.VenicePartitioner;
138139
import com.linkedin.venice.pushmonitor.ExecutionStatus;
@@ -987,11 +988,14 @@ private void setupJobTimeoutMonitor() {
987988
}
988989

989990
LOGGER.info("Scheduling timeout executor for store: {} with timeout: {}ms", pushJobSetting.storeName, timeoutMs);
991+
final long capturedTimeoutMs = timeoutMs;
990992
timeoutExecutor.schedule(() -> {
991-
cancel();
993+
String timeoutDetails = "Push job exceeded timeout: " + capturedTimeoutMs + " ms ("
994+
+ TimeUnit.MILLISECONDS.toHours(capturedTimeoutMs) + " hours)";
995+
cancel(PushJobKillTrigger.SLA_VIOLATION, timeoutDetails);
992996
throw new VeniceTimeoutException(
993-
"Failing push-job for store " + pushJobSetting.storeName + " which is still running after " + timeoutMs
994-
+ " ms (" + TimeUnit.MILLISECONDS.toHours(timeoutMs) + " hours)");
997+
"Failing push-job for store " + pushJobSetting.storeName + " which is still running after "
998+
+ capturedTimeoutMs + " ms (" + TimeUnit.MILLISECONDS.toHours(capturedTimeoutMs) + " hours)");
995999
}, timeoutMs, TimeUnit.MILLISECONDS);
9961000
}
9971001

@@ -2916,10 +2920,20 @@ private String pushJobPropertiesToString(
29162920

29172921
/**
29182922
* A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions or due to
2919-
* the job exceeding bootstrapToOnlineTimeoutInHours.
2923+
* the job exceeding bootstrapToOnlineTimeoutInHours. This is the default cancel that doesn't specify a trigger,
2924+
* which uses USER_REQUEST as the default trigger.
29202925
*/
29212926
public void cancel() {
2922-
killJob(pushJobSetting, controllerClient);
2927+
cancel(PushJobKillTrigger.USER_REQUEST, null);
2928+
}
2929+
2930+
/**
2931+
* A cancel method for graceful cancellation of the running Job with a specific trigger reason.
2932+
* @param trigger the reason for cancellation
2933+
* @param details additional details about the cancellation
2934+
*/
2935+
public void cancel(PushJobKillTrigger trigger, String details) {
2936+
killJob(pushJobSetting, controllerClient, trigger, details);
29232937
if (StringUtils.isEmpty(pushJobSetting.topic)) {
29242938
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
29252939
} else {
@@ -2930,6 +2944,14 @@ public void cancel() {
29302944
}
29312945

29322946
void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
2947+
killJob(pushJobSetting, controllerClient, PushJobKillTrigger.PUSH_JOB_FAILED, null);
2948+
}
2949+
2950+
void killJob(
2951+
PushJobSetting pushJobSetting,
2952+
ControllerClient controllerClient,
2953+
PushJobKillTrigger trigger,
2954+
String details) {
29332955
// Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running
29342956
killDataWriterJob();
29352957
if (!pushJobSetting.isIncrementalPush) {
@@ -2945,11 +2967,16 @@ void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient) {
29452967
if (StringUtils.isEmpty(pushJobSetting.topic)) {
29462968
LOGGER.error("Could not find a store version to delete for store: {}", pushJobSetting.storeName);
29472969
} else {
2970+
String triggerStr = trigger != null ? trigger.name() : PushJobKillTrigger.UNKNOWN.name();
29482971
ControllerClient.retryableRequest(
29492972
controllerClient,
29502973
pushJobSetting.controllerRetries,
2951-
c -> c.killOfflinePushJob(pushJobSetting.topic));
2952-
LOGGER.info("Offline push job has been killed, topic: {}", pushJobSetting.topic);
2974+
c -> c.killOfflinePushJob(pushJobSetting.topic, triggerStr, details));
2975+
LOGGER.info(
2976+
"Offline push job has been killed, topic: {}, trigger: {}, details: {}",
2977+
pushJobSetting.topic,
2978+
trigger,
2979+
details);
29532980
}
29542981
}
29552982
}

clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import com.linkedin.venice.meta.VersionStatus;
9191
import com.linkedin.venice.meta.ViewConfig;
9292
import com.linkedin.venice.meta.ViewConfigImpl;
93+
import com.linkedin.venice.participant.protocol.enums.PushJobKillTrigger;
9394
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
9495
import com.linkedin.venice.pushmonitor.ExecutionStatus;
9596
import com.linkedin.venice.schema.AvroSchemaParseUtils;
@@ -601,14 +602,14 @@ public void testDataWriterComputeJobTimeout(Class<? extends DataWriterComputeJob
601602
doNothing().when(dataWriterJob).validateJob();
602603
doNothing().when(dataWriterJob).configure(any(), any()); // the spark job takes a long time to configure
603604
doAnswer(stallDataWriterJob).when(dataWriterJob).runComputeJob();
604-
doAnswer(killDataWriterJob).when(pushJob).killJob(any(), any());
605+
doAnswer(killDataWriterJob).when(pushJob).killJob(any(), any(), any(PushJobKillTrigger.class), any());
605606
pushJob.run(); // data writer job will run in this main test thread
606607
} catch (VeniceException e) {
607608
// Expected, because the data writer job is not configured to run successfully in this unit test environment
608609
}
609610
assertEquals(runningJobLatch.getCount(), 0); // killDataWriterJob() does not occur in the main test thread
610611
assertEquals(killedJobLatch.getCount(), 0);
611-
verify(pushJob, times(1)).cancel();
612+
verify(pushJob, times(1)).cancel(eq(PushJobKillTrigger.SLA_VIOLATION), anyString());
612613
verify(dataWriterJob, times(1)).kill();
613614
assertEquals(pushJob.getDataWriterComputeJob().getStatus(), DataWriterComputeJob.Status.KILLED);
614615
}

internal/venice-client-common/src/main/java/com/linkedin/venice/exceptions/ErrorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public enum ErrorType {
1616
@JsonEnumDefaultValue
1717
GENERAL_ERROR(ExceptionType.GENERAL_ERROR), BAD_REQUEST(ExceptionType.BAD_REQUEST),
1818
CONCURRENT_BATCH_PUSH(ExceptionType.BAD_REQUEST), RESOURCE_STILL_EXISTS(ExceptionType.BAD_REQUEST),
19-
PROTOCOL_ERROR(ExceptionType.BAD_REQUEST), ACL_ERROR(ExceptionType.BAD_REQUEST),;
19+
PROTOCOL_ERROR(ExceptionType.BAD_REQUEST), ACL_ERROR(ExceptionType.BAD_REQUEST),
20+
SLA_TIMEOUT(ExceptionType.GENERAL_ERROR);
2021

2122
private final ExceptionType exceptionType;
2223

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,4 +292,10 @@ public class ControllerApiConstants {
292292
* Params for dark cluster
293293
*/
294294
public static final String STORES_TO_REPLICATE = "stores_to_replicate";
295+
296+
/**
297+
* Params for KILL push job
298+
*/
299+
public static final String PUSH_JOB_KILL_TRIGGER = "push_job_kill_trigger";
300+
public static final String PUSH_JOB_KILL_DETAILS = "push_job_kill_details";
295301
}

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import static com.linkedin.venice.controllerapi.ControllerApiConstants.POSITION;
4646
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_IN_SORTED_ORDER;
4747
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_ID;
48+
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_KILL_DETAILS;
49+
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_KILL_TRIGGER;
4850
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_STRATEGY;
4951
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_TYPE;
5052
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_OPERATION;
@@ -704,12 +706,22 @@ public ControllerResponse rollForwardToFutureVersion(String storeName) {
704706
}
705707

706708
public ControllerResponse killOfflinePushJob(String kafkaTopic) {
709+
return killOfflinePushJob(kafkaTopic, null, null);
710+
}
711+
712+
public ControllerResponse killOfflinePushJob(String kafkaTopic, String trigger, String details) {
707713
String store = Version.parseStoreFromKafkaTopicName(kafkaTopic);
708714
int versionNumber = Version.parseVersionFromKafkaTopicName(kafkaTopic);
709715
QueryParams params = newParams().add(TOPIC, kafkaTopic) // TODO: remove once the controller is deployed to handle
710716
// store and version instead
711717
.add(NAME, store)
712718
.add(VERSION, versionNumber);
719+
if (trigger != null) {
720+
params.add(PUSH_JOB_KILL_TRIGGER, trigger);
721+
}
722+
if (details != null) {
723+
params.add(PUSH_JOB_KILL_DETAILS, details);
724+
}
713725
return request(ControllerRoute.KILL_OFFLINE_PUSH_JOB, params, ControllerResponse.class);
714726
}
715727

internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_DETAILS;
6565
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_DURATION;
6666
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_ID;
67+
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_KILL_DETAILS;
68+
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_KILL_TRIGGER;
6769
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_JOB_STATUS;
6870
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_STRATEGY;
6971
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_TYPE;
@@ -158,8 +160,10 @@ public enum ControllerRoute implements VeniceDimensionInterface {
158160
UPDATE_DARK_CLUSTER_CONFIG(
159161
"/update_dark_cluster_config", HttpMethod.POST, Collections.singletonList(CLUSTER), STORES_TO_REPLICATE
160162
), JOB("/job", HttpMethod.GET, Arrays.asList(NAME, VERSION)),
161-
KILL_OFFLINE_PUSH_JOB("/kill_offline_push_job", HttpMethod.POST, Collections.singletonList(TOPIC)),
162-
LIST_STORES("/list_stores", HttpMethod.GET, Collections.emptyList(), INCLUDE_SYSTEM_STORES),
163+
KILL_OFFLINE_PUSH_JOB(
164+
"/kill_offline_push_job", HttpMethod.POST, Collections.singletonList(TOPIC), PUSH_JOB_KILL_TRIGGER,
165+
PUSH_JOB_KILL_DETAILS
166+
), LIST_STORES("/list_stores", HttpMethod.GET, Collections.emptyList(), INCLUDE_SYSTEM_STORES),
163167
CLEAN_EXECUTION_IDS("/clean_execution_ids", HttpMethod.GET, Collections.emptyList(), CLUSTER),
164168
LIST_CHILD_CLUSTERS("/list_child_clusters", HttpMethod.GET, Collections.emptyList()),
165169
LIST_NODES("/list_instances", HttpMethod.GET, Collections.emptyList()),

internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreCleaner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
package com.linkedin.venice.meta;
22

33
public interface StoreCleaner {
4-
void deleteOneStoreVersion(String clusterName, String storeName, int versionNumber);
4+
void deleteOneStoreVersion(
5+
String clusterName,
6+
String storeName,
7+
int versionNumber,
8+
boolean isForcedDelete,
9+
boolean deleteDueToError);
510

611
void retireOldStoreVersions(
712
String clusterName,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.linkedin.venice.participant.protocol.enums;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
7+
/**
8+
* Represents the trigger that caused an offline push job to be terminated.
9+
*/
10+
public enum PushJobKillTrigger {
11+
// User/system-initiated actions
12+
USER_REQUEST(0), // Push job was killed due to explicit user request
13+
VERSION_RETIREMENT(1), // Push job was killed due to version retirement
14+
15+
// System-detected violations or preemption
16+
SLA_VIOLATION(2), // Push job violated SLA (e.g., took too long)
17+
PREEMPTED_BY_FULL_PUSH(3), // Repush was preempted by a new full/batch push
18+
19+
// Failures during ingestion or version setup
20+
INGESTION_FAILURE(4), // Push job failed due to ingestion errors
21+
VERSION_CREATION_FAILURE(5), // Push job failed due to version creation errors
22+
PUSH_JOB_FAILED(6), // Generic failure during push job
23+
24+
// Cleanup and maintenance triggers
25+
LINGERING_VERSION_TOPIC(7), // Push job was killed due to lingering version topic
26+
27+
// Catch-all
28+
UNKNOWN(8); // Unknown reason
29+
30+
private final int code;
31+
32+
private static final Map<Integer, PushJobKillTrigger> CODE_MAP = new HashMap<>(8);
33+
private static final Map<String, PushJobKillTrigger> NAME_MAP = new HashMap<>(8);
34+
35+
static {
36+
for (PushJobKillTrigger trigger: values()) {
37+
CODE_MAP.put(trigger.code, trigger);
38+
NAME_MAP.put(trigger.name().toLowerCase(), trigger);
39+
}
40+
}
41+
42+
PushJobKillTrigger(int code) {
43+
this.code = code;
44+
}
45+
46+
public int getCode() {
47+
return code;
48+
}
49+
50+
/**
51+
* Returns the enum constant for the given code.
52+
*
53+
* @param code the integer code
54+
* @return the corresponding KillPushJobTrigger, or null if not found
55+
*/
56+
public static PushJobKillTrigger fromCode(int code) {
57+
return CODE_MAP.get(code);
58+
}
59+
60+
/**
61+
* Returns the enum constant for the given name (case-insensitive).
62+
*
63+
* @param name the name of the enum constant
64+
* @return the corresponding KillPushJobTrigger, or null if not found
65+
*/
66+
public static PushJobKillTrigger fromString(String name) {
67+
if (name == null) {
68+
return null;
69+
}
70+
return NAME_MAP.get(name.toLowerCase());
71+
}
72+
}

internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public enum AvroProtocolDefinition {
7777
*
7878
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
7979
*/
80-
ADMIN_OPERATION(94, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
80+
ADMIN_OPERATION(96, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
8181

8282
/**
8383
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
@@ -168,7 +168,7 @@ public enum AvroProtocolDefinition {
168168
/**
169169
* Value schema for participant system stores.
170170
*/
171-
PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE(1, ParticipantMessageValue.class),
171+
PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE(2, ParticipantMessageValue.class),
172172

173173
/**
174174
* Response record for admin request.

0 commit comments

Comments
 (0)