-
Notifications
You must be signed in to change notification settings - Fork 114
[controller][vpj][protocol] Propagate KILL job trigger and details from VPJ to servers #2463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[controller][vpj][protocol] Propagate KILL job trigger and details from VPJ to servers #2463
Conversation
69b190f to
db7bd80
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR aims to improve observability and control of ingestion/consumption flows by propagating richer “position” metadata through DaVinci ingestion/reporting paths, enhancing throttling and blob-transfer behavior, and updating CI automation.
Changes:
- Expanded ingestion APIs and notifications to operate on
PubSubPosition(instead of raw offsets) and improved logging/metadata propagation. - Added new services/utilities around ingestion/Helix transitions, changelog consumer state, and blob-transfer throttling/cancellation tracking.
- Updated GitHub workflows (stale PR automation and action version bumps/timeouts).
Reviewed changes
Copilot reviewed 26 out of 27 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java | Simplifies consumer pool strategy selection; updates consumption start API to PubSubPosition and ingestion-info API flags. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionThrottler.java | Passes adaptive throttling service stats into adaptive throttlers; removes deprecated AA/WC leader throttler setup. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/IngestionNotificationDispatcher.java | Adds ingestion progress function and migrates notifier callbacks to PubSubPosition. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/InactiveTopicPartitionChecker.java | New scheduled service to pause/resume partitions based on poll inactivity. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumerAction.java | Adds ability to attach a PubSubPosition to consumer actions. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AdaptiveThrottlerSignalService.java | Generalizes to VeniceAdaptiveThrottler, adds refresh interval config, daemon thread factory, and stats accessor. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractStoreBufferService.java | Extends snapshot sync API with a “last record persisted” future. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java | Updates core consumption API to PubSubPosition and extends ingestion-info API flag for redundant logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/utils/IsolatedIngestionUtils.java | Writes pubSubPosition into ingestion task reports instead of a raw offset. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService.java | Offsets API now accepts PubSubContext; improves replica-based logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionReportHandler.java | Decodes PubSubPosition from report payload and updates notifier calls. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionMonitorService.java | Adds PubSubPositionDeserializer setup and replica-based logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServerHandler.java | Improves replica logging; updates isolated ingestion startConsumption API and OffsetRecord construction. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java | Adds optional D2-less schema readers; improves thread factories and pubSubPosition logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionRequestClient.java | Logs replica + pubSubPosition and improves error logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionNotifier.java | Migrates notifier interface implementations to PubSubPosition. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java | Extends ingestion backend to accept optional PubSubPosition; improves logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java | Interface change: startConsumption now receives Optional<PubSubPosition>. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java | Updates notifier callbacks to use PubSubPosition; improves error logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModelFactory.java | Passes resourceName into state model constructor. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/LeaderFollowerPartitionStateModel.java | Tracks offline transition time for “graceful drop” delay; improves replica-aware logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixTransitionTimingUtils.java | New utility to log Helix message timing deltas/context formatting. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java | Stores PushStatusNotifier reference; adds temporary logging around CV reset; provides getter. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/AbstractPartitionStateModel.java | Adds store/version parsing, improved transition logs/timing breakdown, and store-type/role helpers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/stats/BasicConsumerStats.java | Refactors OpenTelemetry setup and changes version-swap metrics to Total; seeds default values. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapMessageState.java | New thread-safe state holder for version swap progress/checkpoints. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java | Adds stats emission + backoff; supports “swap by control message” mode; improves logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumer.java | Adds subscribe/unsubscribe helpers, deprecates pause/resume/seekToEndOfPush, adds heartbeat timestamps API, extends AutoCloseable. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangeCoordinate.java | Adds v3 serialization with consumer-sequence-id; uses PubSubConsumerAdapter for position diffs; expands API. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/StatefulVeniceChangelogConsumer.java | New higher-level consumer interface for local-state use cases. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/LocalBootstrappingVeniceChangelogConsumer.java | Removed wrapper class in favor of new consumer interfaces. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java | Renames offset→position, adds consumer sequence id and replication/control metadata accessors. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/BootstrappingVeniceChangelogConsumer.java | Removed legacy bootstrapping consumer interface. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/factory/VersionSpecificDaVinciClientFactory.java | New factory interface for version-specific DaVinci clients. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/VersionSpecificAvroGenericDaVinciClient.java | New version-specific client that subscribes to a fixed store version. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/SeekableDaVinciClient.java | New explicit “seekable client” interface for timestamp/checkpoint/tail operations. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/InternalDaVinciRecordTransformer.java | Expands transformer callbacks to include record metadata, heartbeats, and control messages; extends recovery API. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciSeekCheckpointInfo.java | New holder for seek checkpoint requests (timestamp/position/tail). |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerUtility.java | Fixes recovery deserialization to use writer schema; adds optional metadata; changes compat checks behavior. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerRecordMetadata.java | New per-record metadata object (schema id, timestamp, position, payload, RM info). |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerFunctionalInterface.java | Expands factory function signature to include store name. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformerConfig.java | Replaces “skip compatibility checks” with recordTransformationEnabled and adds record-metadata toggle; expands docs. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciClient.java | Javadoc fix: explicitly references VeniceException. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroSpecificSeekableDaVinciClient.java | New seekable specific-record client adapter. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroSpecificDaVinciClient.java | Updates superclass constructor invocation (storeVersion param). |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericSeekableDaVinciClient.java | New seekable generic client adapter. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java | Adds transfer stats and improves logging; changes file chunking; fixes request-limit handling. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java | Increases Netty worker threads and wires in blob transfer stats object. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java | Wires blob transfer stats into server handler. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PMetadataTransferHandler.java | Updates transferred offset record handling and writes incremental push status via notifier supplier. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/VeniceAdaptiveBlobTransferTrafficThrottler.java | New adaptive controller for Netty global traffic shaping read/write limits. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java | Adds config for max concurrent blob-receive replicas. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java | Adds per-replica executor, sequential peer-chain execution fix, and cancellation support. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java | Adds transfer status enum and adjusts SSL handshake timeout. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferStatusTrackingManager.java | New manager for blob transfer status/cancellation coordination. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferPayload.java | Adds baseDir/temp partition dir support. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java | Wires stats, adaptive throttlers, notifier supplier, and concurrency config into manager construction. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java | Refines exceptions for “no peers” vs “all peers failed” and exposes transfer status manager. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferGlobalTrafficShapingHandlerHolder.java | Minor formatting cleanup. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManager.java | Adds guards for unsupported engines; makes snapshot cleanup more resilient; logs incremental push status. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java | Supports seeking via checkpoint info and starts heartbeat monitoring per partition; improve replica logging. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java | Adds checkpoint-based seek/subscribe; supports seek-to-tail/all-partitions timestamp behavior. |
| .github/workflows/stale-prs.yml | New workflow to auto-stale/close inactive PRs. |
| .github/workflows/publish-javadoc.yml | Updates action versions for checkout and deploy. |
| .github/workflows/limit-pr-lines-added.yml | Updates checkout action version. |
| .github/workflows/build-and-upload-archives.yml | Updates checkout/setup-java action versions. |
| .github/workflows/build-and-upload-archives-upon-pushing-tags.yml | Updates checkout/setup-java action versions. |
| .github/workflows/build-and-upload-archives-on-schedule.yml | Updates checkout action version. |
| .github/workflows/build-and-upload-archives-on-demand.yml | Updates checkout/setup-java/upload-artifact action versions. |
| .github/workflows/build-and-publish-docker-images.yml | Adds stricter tag validation and timeouts; updates action versions. |
| .github/workflows/VeniceCI-StaticAnalysisAndUnitTests.yml | Updates checkout/setup-java/upload-artifact action versions; adjusts failure-job timeout. |
| .github/workflows/VeniceCI-CompatibilityTests.yml | Updates checkout/setup-java/upload-artifact action versions; reduces timeouts; adds background disk cleanup. |
| .github/workflows/UnitTests-core.yml | Updates checkout/setup-java/upload-artifact action versions; reduces timeout; adds background disk cleanup. |
| .github/rawWorkflows/gh-ci-parameterized-flow.txt | Updates action versions and switches artifact steps to always(). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Paused partitions are skipped during inactivity detection (163-165), so currentlyInactiveTopicPartitions never contains them. As a result, the resume loop (182-186) resumes all previously paused partitions every cycle, even if they’re still inactive, causing repeated pause/resume thrashing. Fix by evaluating inactivity for paused partitions too (don’t continue), and only resume a previously-paused partition once it’s confirmed active; alternatively compute currentlyInactiveTopicPartitions over all assignments and then resume previouslyPaused - currentlyInactive.
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Paused partitions are skipped during inactivity detection (163-165), so currentlyInactiveTopicPartitions never contains them. As a result, the resume loop (182-186) resumes all previously paused partitions every cycle, even if they’re still inactive, causing repeated pause/resume thrashing. Fix by evaluating inactivity for paused partitions too (don’t continue), and only resume a previously-paused partition once it’s confirmed active; alternatively compute currentlyInactiveTopicPartitions over all assignments and then resume previouslyPaused - currentlyInactive.
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RandomAccessFile raf is never closed. ChunkedFile does not reliably guarantee the underlying RandomAccessFile is closed in all success/failure paths, which can leak file descriptors under load. Close raf in the sendFileFuture listener (both success and failure) and also handle the case where the write fails before the listener runs (e.g., wrap listener logic to close the handle in a finally-like block).
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RandomAccessFile raf is never closed. ChunkedFile does not reliably guarantee the underlying RandomAccessFile is closed in all success/failure paths, which can leak file descriptors under load. Close raf in the sendFileFuture listener (both success and failure) and also handle the case where the write fails before the listener runs (e.g., wrap listener logic to close the handle in a finally-like block).
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RandomAccessFile raf is never closed. ChunkedFile does not reliably guarantee the underlying RandomAccessFile is closed in all success/failure paths, which can leak file descriptors under load. Close raf in the sendFileFuture listener (both success and failure) and also handle the case where the write fails before the listener runs (e.g., wrap listener logic to close the handle in a finally-like block).
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field/method name postitionMap / getPostitionMap() is misspelled, and it becomes part of the public API surface. Rename to positionMap / getPositionMap() (and update parameter names accordingly) to avoid propagating a typo across call sites.
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field/method name postitionMap / getPostitionMap() is misspelled, and it becomes part of the public API surface. Rename to positionMap / getPositionMap() (and update parameter names accordingly) to avoid propagating a typo across call sites.
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Javadoc says this returns a 'signed difference', but the implementation returns comparator-style values in at least one branch (Long.compare(...) yields -1/0/1), while another branch may return an actual difference (pubSubConsumer.positionDifference(...)). Please make the contract consistent: either (a) document it as comparator semantics (negative/zero/positive) and ensure all branches follow that, or (b) return a true numeric difference everywhere (and rename the method to reflect that if needed).
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These Javadoc blocks are placed after @Override instead of before the annotations. Many style tools (and IDEs/Javadoc generators) won’t associate this documentation with the method properly. Move the Javadoc above the annotation so it attaches to the method declaration.
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These Javadoc blocks are placed after @Override instead of before the annotations. Many style tools (and IDEs/Javadoc generators) won’t associate this documentation with the method properly. Move the Javadoc above the annotation so it attaches to the method declaration.
db7bd80 to
9750b52
Compare
9750b52 to
80f92f8
Compare
80f92f8 to
db35b3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 28 out of 29 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * | ||
| * @param code the integer code | ||
| * @return the corresponding KillPushJobTrigger, or null if not found | ||
| */ |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc refers to KillPushJobTrigger, but the enum is PushJobKillTrigger. Updating the wording avoids confusion and makes searchability consistent.
| verify(admin, timeout(TIMEOUT)).createStore(clusterName, storeName, owner, keySchema, valueSchema, false); | ||
| verify(admin, timeout(TIMEOUT)).killOfflinePush(clusterName, storeTopicName, false); | ||
| verify(admin, timeout(TIMEOUT)) | ||
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), anyString(), eq(false)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mockito verification uses anyString() for the new details argument, but the generated KillOfflinePushJob admin message in this test never sets details, so the production code will pass null. anyString() does not match null, which will make this assertion fail (or be brittle). Use a null-tolerant matcher (e.g., nullable(String.class) / any() / isNull() depending on what you want to assert).
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), anyString(), eq(false)); | |
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), any(), eq(false)); |
| verify(admin, never()) | ||
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), anyString(), eq(false)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This never() verification uses anyString() for details. If killOfflinePush is invoked with a null details (which is valid with the new schema), this matcher would not match and the test could incorrectly pass. Prefer a null-tolerant matcher for details so the test actually guards against unexpected calls.
| // Duplicate messages from the rewind should be skipped. | ||
| verify(admin, times(10)).killOfflinePush(clusterName, topicName, false); | ||
| verify(admin, times(10)) | ||
| .killOfflinePush(eq(clusterName), eq(topicName), any(PushJobKillTrigger.class), anyString(), eq(false)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
times(10) verification uses anyString() for the new details arg, but details may be null when deserializing older/unspecified KillOfflinePushJob messages. Since anyString() doesn't match null, this will fail if details is not set. Use a matcher that accepts null (or explicitly assert non-null if the test sets it).
| .killOfflinePush(eq(clusterName), eq(topicName), any(PushJobKillTrigger.class), anyString(), eq(false)); | |
| .killOfflinePush(eq(clusterName), eq(topicName), any(PushJobKillTrigger.class), any(), eq(false)); |
| for (PushJobKillTrigger trigger: values()) { | ||
| CODE_MAP.put(trigger.code, trigger); | ||
| NAME_MAP.put(trigger.name().toLowerCase(), trigger); | ||
| } |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NAME_MAP is built using trigger.name().toLowerCase() which uses the default locale. For locale-stable behavior (e.g., Turkish locale edge cases), use toLowerCase(Locale.ROOT) when normalizing enum names.
| if (name == null) { | ||
| return null; | ||
| } | ||
| return NAME_MAP.get(name.toLowerCase()); | ||
| } |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fromString() uses name.toLowerCase() (default locale). Use toLowerCase(Locale.ROOT) to avoid locale-dependent mismatches when parsing trigger strings.
| PushJobKillTrigger trigger = | ||
| triggerStr != null ? PushJobKillTrigger.fromString(triggerStr) : PushJobKillTrigger.USER_REQUEST; | ||
| if (trigger == null) { | ||
| trigger = PushJobKillTrigger.UNKNOWN; |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a client supplies an unrecognized push_job_kill_trigger, this endpoint silently maps it to UNKNOWN. For observability/debugging, it’s safer to treat invalid trigger values as a 400 (BAD_REQUEST) so callers don’t think they successfully reported a specific trigger when they didn’t.
| trigger = PushJobKillTrigger.UNKNOWN; | |
| response.status(HttpStatus.SC_BAD_REQUEST); | |
| responseObject.setError( | |
| "Invalid value for '" + PUSH_JOB_KILL_TRIGGER + "': '" + triggerStr + "'. Please provide a valid trigger."); | |
| responseObject.setErrorType(ErrorType.BAD_REQUEST); | |
| return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); |
| }, | ||
| { | ||
| "name": "enumSchemaEvolutionAllowed", | ||
| "doct": "Flag to control whether a certain store is allowed to evolve enum schema since the readers need to use Avro-1.9+", |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema field uses doct instead of doc. Avro will ignore unknown properties, so this description won’t be carried through tooling; consider fixing to doc to match the rest of the schema metadata.
| "doct": "Flag to control whether a certain store is allowed to evolve enum schema since the readers need to use Avro-1.9+", | |
| "doc": "Flag to control whether a certain store is allowed to evolve enum schema since the readers need to use Avro-1.9+", |
db35b3b to
e175f33
Compare
4998420 to
d55a12d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 30 out of 31 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java:8261
- ParticipantMessageValue protocol version is bumped to v2 (see AvroProtocolDefinition), but this writer still uses
PARTICIPANT_MESSAGE_STORE_SCHEMA_ID = 1when putting the record. This will write v2 records under the v1 schema id and can break deserialization/compatibility in the participant message system store. Update the schema id to 2 (or derive it fromAvroProtocolDefinition.PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE.getCurrentProtocolVersion()) to keep it in sync.
ParticipantMessageValue value = new ParticipantMessageValue();
value.messageType = killPushJobType.getValue();
value.messageUnion = message;
writer.put(key, value, PARTICIPANT_MESSAGE_STORE_SCHEMA_ID);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // The error handling path calls killJob directly with PUSH_JOB_FAILED trigger | ||
| verify(pushJob, times(1)).killJob(any(), any(), eq(PushJobKillTrigger.PUSH_JOB_FAILED), any()); | ||
| verify(pushJob.getDataWriterComputeJob(), times(1)).kill(); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test stubs client.killOfflinePushJob(anyString()), but the production code path now calls the new overload killOfflinePushJob(topic, trigger, details). Since the 3-arg method isn’t stubbed, the mock returns null and ControllerClient.retryableRequest(...) will NPE when it calls response.isError(). Stub the 3-arg overload (or use doReturn(response).when(client).killOfflinePushJob(anyString(), anyString(), anyString()/any())).
| verify(mockKafkaConsumer, timeout(TIMEOUT)).unSubscribe(any()); | ||
| verify(admin, never()).killOfflinePush(clusterName, storeTopicName, false); | ||
| verify(admin, never()) | ||
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), anyString(), eq(false)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyString() won’t match null for the new details argument, and the test message doesn’t set details (defaults to null). This verify(..., never()) can fail due to argument mismatch. Use a null-tolerant matcher for details or populate details in the test message.
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), anyString(), eq(false)); | |
| .killOfflinePush(eq(clusterName), eq(storeTopicName), any(PushJobKillTrigger.class), any(), eq(false)); |
…om VPJ to servers Add support for propagating the real KILL caller information (trigger reason + details) from VenicePushJob through Controllers to Servers, enabling better observability and debugging of push job terminations. Key changes: - Add PushJobKillTrigger enum with trigger types: USER_REQUEST, VERSION_RETIREMENT, SLA_VIOLATION, PREEMPTED_BY_FULL_PUSH, INGESTION_FAILURE, VERSION_CREATION_FAILURE, PUSH_JOB_FAILED, LINGERING_VERSION_TOPIC, UNKNOWN - Add AdminOperation v96 schema with trigger/details fields in KillOfflinePushJob - Add ParticipantMessageValue v2 schema with trigger/details fields in KillPushJob - Update Admin.killOfflinePush() and StoreCleaner.deleteOneStoreVersion() interfaces - Propagate trigger/details through VeniceHelixAdmin and VeniceParentHelixAdmin - Add PUSH_JOB_KILL_TRIGGER and PUSH_JOB_KILL_DETAILS controller API params - VPJ uses SLA_VIOLATION for timeout kills, PUSH_JOB_FAILED for error cleanup - Add SLA_TIMEOUT error type
d55a12d to
6c69595
Compare
| pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.KILLED.getValue())); | ||
| } | ||
| pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs); | ||
| sendPushJobDetailsToController(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also propagate the kill trigger down when we report push details to the controller as well? That way we can filter out sla violations and user triggered kills in the push job dashboard
| * TODO: Move AdminOperation to venice-common module so that we can properly reference it here. | ||
| */ | ||
| ADMIN_OPERATION(94, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"), | ||
| ADMIN_OPERATION(96, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we separate out protocol upgrades and bumping up the version to separate prs?
Summary
PushJobKillTriggerenum to capture the reason for push job termination (USER_REQUEST, VERSION_RETIREMENT, SLA_VIOLATION, PREEMPTED_BY_FULL_PUSH, INGESTION_FAILURE, VERSION_CREATION_FAILURE, PUSH_JOB_FAILED, LINGERING_VERSION_TOPIC, UNKNOWN)triggeranddetailsfields to AdminOperation (v96) KillOfflinePushJob messagetriggeranddetailsfields to ParticipantMessageValue (v2) KillPushJob messageThis enables better observability and debugging of push job terminations by tracking the actual reason for the kill.
Changes
PushJobKillTriggerenum with all trigger typesAdmin.killOfflinePush()to accept trigger and detailsStoreCleaner.deleteOneStoreVersion()to accept isForcedDelete and deleteDueToError flagsTest plan