[da-vinci] Flush Global RT DIV state on graceful shutdown#2836
[da-vinci] Flush Global RT DIV state on graceful shutdown#2836KaiSernLim wants to merge 4 commits into
Conversation
Global RT DIV persists RT/VT DIV state through byte-threshold-triggered consumer/leader paths and deliberately disables the drainer SYNC_OFFSET sync during graceful shutdown. As a result, the in-memory RT/VT DIV deltas accumulated since the last threshold-triggered sync were never flushed on a graceful stop, causing a post-restart bootstrap slowdown. The non-Global-RT-DIV (OffsetRecord) path already closes this gap via the drainer SYNC_OFFSET command. Add an on-demand forceGlobalRtDivSync(pcs) invoked from executeShutdownRunnable, gated by the existing isServerIngestionCheckpointDuringGracefulShutdownEnabled config: - Leader: produce a GlobalRtDivState per RT source broker (skipping brokers whose LCRP is EARLIEST) via a position-based sendGlobalRtDivMessage variant; the produce callback chains the VT DIV + LCVP sync, covering both halves. - Follower / no-RT-progress: enqueue a waitable SYNC_GLOBAL_RT_DIV drainer command that snapshots VT DIV in the drainer thread and syncs it to the OffsetRecord, guarded against persisting EARLIEST. forceGlobalRtDivSync returns an aggregate future; the shutdown await reuses waitForSyncOffsetCmd's timeout/cancel semantics so shutdown never hangs on a produce/await failure. Runs inside shutdownPartitionConsumptionStates() (after consumerBatchUnsubscribeAllTopics, before closeVeniceWriters), so the VeniceWriter is alive and no new RT records are arriving. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR addresses a graceful-shutdown gap in Global RT DIV state persistence by adding a best-effort, on-demand flush path that runs during shutdown to persist accumulated RT/VT DIV deltas that would otherwise be lost until the next byte-threshold-triggered sync (causing slower post-restart bootstrap).
Changes:
- Updated graceful shutdown checkpointing to invoke a Global-RT-DIV-specific flush (
forceGlobalRtDivSync) when Global RT DIV is enabled, while preserving the existing SYNC_OFFSET drainer path for non-Global-RT-DIV. - Added a new waitable drainer command (
SYNC_GLOBAL_RT_DIV) that snapshots VT DIV in the drainer thread and persists it via the OffsetRecord sync path. - Refactored
sendGlobalRtDivMessageto accept an explicit position + topic-partition and return theLeaderProducerCallbackso shutdown code can wait on persistence completion; added unit tests covering leader/follower shutdown sync behaviors and EARLIEST guards.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Routes graceful shutdown to either forceGlobalRtDivSync (Global RT DIV) or SYNC_OFFSET (non-Global), and adds drainer-thread VT snapshot sync (syncGlobalRtDivFromSnapshot). |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Implements the shutdown-time Global RT DIV flush (leader per-broker produce + follower VT snapshot fallback) and refactors sendGlobalRtDivMessage to a position-based signature returning the producer callback. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java | Adds a new waitable SYNC_GLOBAL_RT_DIV command type and plumbing to enqueue/execute it. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractStoreBufferService.java | Extends the abstract API with execSyncGlobalRtDivCommandAsync. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SeparatedStoreBufferService.java | Delegates the new execSyncGlobalRtDivCommandAsync to the appropriate underlying buffer service. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java | Exposes LeaderProducedRecordContext via a getter so shutdown code can wait on persistence. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java | Adds shutdown-path unit tests specific to Global RT DIV sync invocation and timeout-bounded waiting behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java | Adds unit test ensuring SYNC_GLOBAL_RT_DIV routes to syncGlobalRtDivFromSnapshot and remains non-hanging when PCS is null. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java | Adds unit tests for leader per-broker forced sync, EARLIEST-skip behavior, follower VT-only forced sync, and EARLIEST LCVP guard in snapshot sync. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…nd type
Replace the SYNC_GLOBAL_RT_DIV CommandType with a dedicated drainer node,
mirroring the existing SyncVtDivNode, so the follower / no-RT-progress
shutdown path reads as "enqueue a waitable VT DIV sync node" rather than
overloading the CommandQueueNode "command" abstraction with a second,
behaviorally-different meaning.
- Add SyncGlobalRtDivNode whose execute() calls syncGlobalRtDivFromSnapshot,
wired into the drainer loop alongside SyncVtDivNode.
- Extract a WaitableQueueNode base holding the LockAssistedCompletableFuture
and the lock-guarded executeGuarded(), shared by CommandQueueNode and
SyncGlobalRtDivNode, preserving the "once executing, cannot be cancelled"
guarantee waitForSyncOffsetCmd relies on (no duplication).
- Revert CommandType to { SYNC_OFFSET } and processCommand /
execSyncOffsetCommandAsync to their original single-purpose forms.
- Rename the enqueue method execSyncGlobalRtDivCommandAsync ->
execSyncGlobalRtDivAsync (it no longer routes through CommandQueueNode).
- Consolidate the null-PCS guard into syncGlobalRtDivFromSnapshot, dropping
the redundant pre-check in processCommand.
Tests: testExecSyncGlobalRtDivAsync now also asserts the future completes
exceptionally (shutdown never hangs) when the sync throws; the
syncGlobalRtDivFromSnapshot test gains a null-PCS case.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| // shutdownPartitionConsumptionStates() (after consumerBatchUnsubscribeAllTopics, before closeVeniceWriters), | ||
| // so the VeniceWriter is alive and no new RT records are arriving. Both paths await with the shutdown | ||
| // sync-offset timeout, reusing waitForSyncOffsetCmd's timeout/cancel semantics so shutdown never hangs. | ||
| CompletableFuture<Void> syncFuture = isGlobalRtDivEnabled() |
There was a problem hiding this comment.
Not a question, but just a thought to confirm. Since we have called consumerBatchUnsubscribeAllTopics earlier in this workflow, I would assume that all consumer threads cannot poll and process any data at this stage. In other words, all consumer DIV states are fixed and unlikely to be changed. Thus, there should be no races between the consumer thread and the shutdown thread here?
There was a problem hiding this comment.
🤖 Correct. executeShutdownRunnable runs inside shutdownPartitionConsumptionStates(), which is after consumerBatchUnsubscribeAllTopics() — so no consumer thread is polling or processing for these partitions, the consumer DIV state is frozen, and there's no race with the shutdown thread (or the drainer-side snapshot). That window is exactly what this relies on, and it's also why the steady-state drainer SYNC_OFFSET path is intentionally disabled for Global RT DIV — it would otherwise interfere while consumers are still live.
| PartitionTracker vtDivSnapshot = | ||
| getDataIntegrityValidator().cloneVtProducerStates(partition, true, pcs.getLatestMessageTimeInMs()); |
There was a problem hiding this comment.
Why do we take a consumer div snapshot in the drainer thread? It doesn't seem wrong, but it's a bit different from the regular VT div pattern.
There was a problem hiding this comment.
🤖 At shutdown there's no live consumer record driving the sync, so unlike the steady-state path — where the consumer thread clones the snapshot and hands it to a fire-and-forget SyncVtDivNode gated on lastQueuedRecordPersistedFuture — here we clone inside the drainer thread. Because the drainer is single-threaded and processes a partition's records FIFO, by the time this node runs every previously-queued record for the partition has already been persisted, so the snapshot is consistent without needing that future dependency. The actual write still goes through the same updateAndSyncOffsetFromSnapshot the regular path uses.
…hutdown sync The leader graceful-shutdown path produced its GlobalRtDivState to the local VT (whose produce-completion callback already enqueues a fire-and-forget SyncVtDivNode to sync the VT DIV + LCVP), and then ALSO enqueued a second, waitable SyncGlobalRtDivNode purely to obtain a future to await. That second sync re-cloned and re-wrote the same OffsetRecord state — redundant work on every broker at shutdown. Make SyncVtDivNode extend WaitableQueueNode so its drainer-side execution is awaitable, and have the leader await the produce-completion node directly: - StoreBufferService.execSyncOffsetFromSnapshotAsync now returns the node's completion future (steady-state callers ignore it). - sendVtDivSnapshotOnCompletion returns a relay future that completes when the drainer-side VT DIV sync runs; it also fails the relay if the produce/persist fails, since the produce-completion callback only fires on success (so the shutdown await never hangs). - sendGlobalRtDivMessage returns that future; forceGlobalRtDivSync's leader branch awaits it instead of chaining a second SyncGlobalRtDivNode. The follower / no-RT-progress branch still uses the waitable SyncGlobalRtDivNode. The sync stays on the single-threaded drainer: syncOffset (storageEngine.sync + storageMetadataService.put) must remain serialized with record processing, so it cannot move to the SIT thread (same reason the non-Global SYNC_OFFSET path enqueues into the drainer). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
persistedToDBFuture on produce error Two robustness fixes from the Copilot review on the graceful-shutdown Global RT DIV flush: - StoreIngestionTask.updateAndSyncOffsetFromSnapshot: guard against a null PCS. The PCS can be removed between a SyncVtDivNode being enqueued and executed (shutdown/unsubscribe); skip cleanly rather than NPE so the waitable node completes normally and the shutdown await stays deterministic. Mirrors the existing guard in syncGlobalRtDivFromSnapshot. - LeaderProducerCallback.onCompletion: on produce failure, complete persistedToDBFuture exceptionally. Previously it was left uncompleted, so waiters blocked: leader topic-switch's getLastLeaderPersistFuture().get() hangs indefinitely, and the shutdown Global RT DIV relay (sendVtDivSnapshotOnCompletion, which keys its fail-fast off this future) would hang until the shutdown timeout. This is what actually makes the relay fail fast on produce failure. Adds unit tests for both. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Problem Statement
Venice's Global RT DIV feature persists DIV (Data Integrity Validation) state through two consumer/leader-driven paths, both gated by byte-count thresholds during steady-state ingestion (the RT DIV produce-to-VT path and the VT DIV snapshot path).
On a graceful shutdown,
executeShutdownRunnable()andupdateOffsetMetadataAndSyncOffset()both explicitly bail whenisGlobalRtDivEnabled()(deliberately deferred — Global RT DIV is consumer-driven, so any drainer sync must be disabled to not interfere). As a result, nothing flushes the in-memory RT/VT DIV deltas accumulated since the last threshold-triggered sync. On restart the server replays from the last synced point, causing a post-restart bootstrap slowdown. The non-Global-RT-DIV (OffsetRecord) path already closes this gap via the drainerSYNC_OFFSETcommand.Solution
Add a single on-demand DIV sync,
forceGlobalRtDivSync(pcs), that flushes both the RT and VT DIV state, invoked during graceful shutdown — mirroring what the OffsetRecord path already does. It is gated by the existingisServerIngestionCheckpointDuringGracefulShutdownEnabledserver config (no new flag).EARLIEST, produce aGlobalRtDivStatevia a position-basedsendGlobalRtDivMessagevariant. The produce callback already chains the VT DIV + LCVP sync, so both halves are covered.SYNC_GLOBAL_RT_DIVdrainer command that snapshots VT DIV in the drainer thread and syncs it to the OffsetRecord, guarded against persistingEARLIEST. RT DIV is already durable from when the follower consumedGlobalRtDivState.forceGlobalRtDivSyncreturns an aggregateCompletableFuture. The shutdown await reuseswaitForSyncOffsetCmd's timeout/cancel semantics (bounded bygetShutdownSyncOffsetTimeoutMs()), so shutdown never hangs on a produce/await failure. The flush runs insideshutdownPartitionConsumptionStates()— afterconsumerBatchUnsubscribeAllTopics(no new RT records arriving) and beforecloseVeniceWriters(VeniceWriter still alive) — exactly the window where the "don't interfere" concern no longer applies.Trade-offs: the work is bounded by RT broker count, runs only on graceful shutdown (not a steady-state hot path), and is best-effort (failures are logged, never thrown, and capped by the shutdown timeout).
Code changes
isServerIngestionCheckpointDuringGracefulShutdownEnabled(no new config introduced).Concurrency-Specific Checks
How was this PR tested?
SYNC_GLOBAL_RT_DIVrouting).sendGlobalRtDivMessageupdated to the position-based signature; steady-state values unchanged).Test runs (clients/da-vinci-client):
LeaderFollowerStoreIngestionTaskTest(149),StoreBufferServiceTest(22), and theSITWith*subclasses coveringStoreIngestionTask(322 each) — all green, 0 failures.Does this PR introduce any user-facing or breaking changes?