[server] Add lkc heartbeat header for active-key-count consistency check across replicas#2844
Open
m-nagarajan wants to merge 1 commit into
Open
Conversation
across replicas When the active-key-count replica consistency check is enabled, the A/A leader attaches an `lkc` (leader key count) PubSub header on every VT heartbeat carrying its current active key count. Followers compare against their own count when processing the heartbeat; a mismatch records `ingestion.key.active_count_mismatch_across_replicas` (Tehuti + OTel) and emits a rate-limited WARN log. The follower's count is NOT invalidated — the check is diagnostic-only. Design notes: - New config `server.active.key.count.replica.consistency.check.enabled` (default false). Effective only when `server.active.key.count.for.hybrid.store.enabled` is also true; the `VeniceServerConfig` accessor ANDs the two so registration and recording sites can't drift. - `PartitionConsumptionState.lastVTProduceCallFuture` switched from a volatile `CompletableFuture<Void>` to a `final AtomicReference< CompletableFuture<Void>>` with `swapLastVTProduceCallFuture(next)` returning the prior head. The new swap rejects null. `sendIngestionHeartbeatToVT` chains the HB send behind the previous chain entry via `whenCompleteAsync` so the lkc lands at the broker after preceding view-writer data writes. - The lkc value is read synchronously on the SIT thread BEFORE the swap and captured in the chained-callback lambda. Reading from inside the deferred callback would observe later SIT increments for records queued AFTER this HB → false-positive mismatches. - The chained callback propagates the upstream chain failure (skipping the HB to avoid stamping records that never landed) and surfaces synchronous `sendIngestionHeartbeat` failures into the chain head; async broker rejections are logged separately and don't gate the chain (matches `queueUpVersionTopicWritesWithViewWriters` enqueue-completion semantics). - `closeVeniceViewWriters` now short-circuits the chain head unconditionally so hybrid A/A configs without view writers (which can still grow the chain via the HB swap) don't leave a pending head orphaned at close. `checkAndWaitForLastVTProduceFuture` bounds the EOP-time wait with `VIEW_WRITER_CLOSE_TIMEOUT_IN_MS` and surfaces `TimeoutException` with explicit attribution at the caller. - `decodeLeaderKeyCountHeaderValue` throws `IllegalArgumentException` on wrong-length payloads rather than returning a `Long.MIN_VALUE` sentinel that would collide with a legitimately round-trippable value. - `ActiveKeyCountInvalidationReason` now implements `VeniceDimensionInterface` and is wired as a dimension on the existing `ingestion.key.active_count_invalidation` metric. Three corruption variants were renamed for clarity: `CORRUPT_KCS_SIGNAL_VALUE → CORRUPT_KEY_COUNT_SIGNAL_HEADER_VALUE`, `CORRUPT_KCS_HEADER_LENGTH → CORRUPT_KEY_COUNT_SIGNAL_HEADER_LENGTH`, and the new `CORRUPT_LEADER_KEY_COUNT_HEADER_LENGTH` for malformed lkc. The enum guards `getMessage` vs `getMessage(int)` against mismatched overload usage via a constructor-computed `templateWithExtraData` flag. - `VeniceWriter.sendHeartbeat` gained a 7-arg overload that attaches an optional extra PubSub header. The 6-arg overload still exists and delegates to the 7-arg with null. Tests migrated to stub the 7-arg signature; production code calls the 7-arg form unconditionally. - `EmptyPubSubMessageHeaders` singleton is promoted to a fresh mutable instance before appending the extra header. Wire constants moved to `PubSubMessageHeaders`: `VENICE_KEY_COUNT_SIGNAL_HEADER` (kcs) and `VENICE_LEADER_KEY_COUNT_HEADER` (lkc). Test coverage: - `ActiveKeyCountHeartbeatTest`: encode/decode round-trip including Long.MIN_VALUE and Long.MAX_VALUE; `buildLeaderActiveKeyCountHeader` gating (check disabled, pre-EOP, untracked, null PCS, happy path); `compareLeaderActiveKeyCountOnHeartbeat` branches (check disabled, pre-EOP, follower not tracking, header absent, leader sentinel, match, mismatch, corrupt length); `sendIngestionHeartbeatToVT` ordering, upstream-failure propagation, sync send-failure propagation. - `ActiveKeyCountInvalidationReasonTest`: dimension fixture for all 7 enum values. - `ActiveKeyCountScenarioTest`: end-to-end Tehuti+OTel parity for both `recordActiveKeyCountInvalidation` and `recordActiveKeyCountMismatchAcrossReplicas`, plus cross-system isolation. - `PartitionConsumptionStateTest`: 32-thread chain atomicity test for `swapLastVTProduceCallFuture`; null-arg NPE test. - `LeaderFollowerStoreIngestionTaskTest`: bounded EOP-wait timeout, empty-view-writers + pending chain head short-circuit, view-writer/ CM-write chain-failure-propagation tests. - `IngestionOtelMetricEntityTest` / `IngestionOtelStatsTest` / `ServerMetricEntityTest` / `VeniceMetricsDimensionsTest`: updated for the new dimension and the new mismatch metric entity.
There was a problem hiding this comment.
Pull request overview
This PR adds a diagnostic-only consistency check for active key count tracking across A/A hybrid replicas by stamping the leader’s active key count onto VT heartbeats and having followers compare against their locally maintained count. The check is gated behind a new server config and emits operator-visible signals (Tehuti + OTel metrics plus rate-limited WARN logs) without mutating/invalidation of follower state on mismatches.
Changes:
- Add
lkc(leader key count) PubSub header on VT heartbeats (gated) and compare it on followers to record a new cross-replica mismatch metric + WARN log. - Strengthen VT produce-call ordering/chain semantics (atomic swap of chain head; bounded EOP wait; robust close short-circuiting).
- Extend metrics plumbing: new OTel dimension for active-key-count invalidation reasons and a new mismatch metric entity, plus broad unit test coverage.
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds unit coverage for the new sendHeartbeat overload that can attach an extra PubSub header. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Adds a sendHeartbeat overload that appends an optional extra PubSub header (used for lkc). |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java | Introduces wire constants for kcs and lkc headers. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Adds config key server.active.key.count.replica.consistency.check.enabled with documentation. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java | Updates dimension name formatting tests for the new active-key-count invalidation reason dimension. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java | Adds VENICE_ACTIVE_KEY_COUNT_INVALIDATION_REASON as a new dimension. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java | Updates expected server metric entity count for newly added metric entity. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java | Updates OTel stats tests for new constructor signature and invalidation reason dimension recording. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java | Updates metric entity expectations for invalidation reason dimension and new mismatch metric entity. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java | Updates mocks/verifications for VeniceWriter.sendHeartbeat signature change. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Adds concurrency regression tests for atomic swap of the VT produce-call chain head. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java | Adds regression tests around chain completion, bounded wait timeout, and close short-circuit behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java | Updates tests to use the new kcs header constant and revised invalidation recording signature. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountScenarioTest.java | Adds parity/isolation tests for mismatch vs invalidation across Tehuti and OTel. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountInvalidationReasonTest.java | Adds dimension fixture test for ActiveKeyCountInvalidationReason values and naming. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountHeartbeatTest.java | Adds end-to-end unit tests for lkc header encode/decode, gating, compare logic, and VT-heartbeat chaining behavior. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java | Updates no-op implementation to match new active-key-count invalidation/mismatch recording APIs. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java | Adds reason-dimensioned invalidation counter and gated mismatch counter; updates constructor API accordingly. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java | Updates invalidation metric description/dimensions and adds mismatch metric entity. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java | Adds Tehuti rate gauge for mismatch across replicas, gated by the consistency-check config. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java | Wires mismatch gating into per-store OTel stats and adds mismatch/invalidation-with-reason recording entry points. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Moves kcs constant to wire headers, adds lkc encode/decode helpers, records mismatch metric, and adds invalidation reason dimension plumbing. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Replaces volatile chain head with AtomicReference + atomic swap method to preserve chaining invariants. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Implements lkc attach/compare logic, chains VT heartbeat behind prior VT writes, adds bounded EOP wait, and improves close short-circuiting. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountInvalidationReason.java | Renames/extends invalidation reasons, adds message-format guarding, and implements VeniceDimensionInterface. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java | Updates A/A leader kcs signal headers to use new wire header constant. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java | Adds config accessor for replica-consistency check that ANDs with hybrid active-key-count tracking enablement. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+587
to
+594
| * <p>Covers two configurations to exercise both branches of the singleton-promotion in the writer: | ||
| * <ul> | ||
| * <li>{@code addLeaderCompleteState=false}: {@code getHeaders} returns the immutable | ||
| * {@link EmptyPubSubMessageHeaders} singleton — writer must promote to a mutable instance before | ||
| * appending {@code lkc}, else the singleton's {@code add} throws.</li> | ||
| * <li>{@code addLeaderCompleteState=true}: {@code getHeaders} already returns a mutable instance with the | ||
| * LCS + VTP headers; appending {@code lkc} should work directly without allocation.</li> | ||
| * </ul> |
| /* ---------- 4. ActiveKeyCountInvalidationReason message formatting ---------- */ | ||
|
|
||
| @Test | ||
| public void testCorruptLakcReasonRendersLength() { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
In an A/A hybrid store, the active key count on a partition is tracked independently on the leader (which produces to the version topic) and on each follower (which consumes from the version topic and re-applies the leader-stamped
kcssignal). Today there is no operator-visible signal when the two views diverge — a kcs signal lost in flight, mis-stamped on the wire, or mis-applied on the follower would silently corrupt the follower's count and only surface much later via downstream symptoms.We need a non-intrusive, observation-only consistency check that operators can enable per cluster and that emits a dashboard signal when leader and follower disagree.
Solution
When the new consistency check is enabled, the A/A leader attaches an
lkc(leader key count) PubSub header to every VT heartbeat carrying its currentactiveKeyCount. Followers consuming that heartbeat compare lkc against their own count and, on mismatch, recordingestion.key.active_count_mismatch_across_replicas(both Tehuti and OTel) plus a rate-limited WARN log. The follower's count is NOT invalidated — the check is diagnostic-only during rollout.Key design points:
server.active.key.count.replica.consistency.check.enabled(defaultfalse). TheVeniceServerConfigaccessor ANDs it withisActiveKeyCountForHybridStoreEnabled()so sensor registration and recording sites can't drift.PartitionConsumptionState.lastVTProduceCallFutureswitched from avolatile CompletableFutureto afinal AtomicReference<CompletableFuture<Void>>with an atomicswapLastVTProduceCallFuture(next)that returns the prior head and rejects null.sendIngestionHeartbeatToVTchains the HB send behind the previous chain entry so the lkc lands at the broker after preceding view-writer data writes.sendIngestionHeartbeatfailures into the chain head. Async broker rejections are logged separately and don't gate the chain — matchesqueueUpVersionTopicWritesWithViewWritersenqueue-completion semantics.closeVeniceViewWritersnow short-circuits the chain head unconditionally so hybrid A/A configs without view writers (which can still grow the chain via the HB swap) don't leave a pending head orphaned at close.checkAndWaitForLastVTProduceFuturebounds the EOP-time wait withVIEW_WRITER_CLOSE_TIMEOUT_IN_MSand surfacesTimeoutExceptionwith explicit attribution at the caller.decodeLeaderKeyCountHeaderValuethrowsIllegalArgumentExceptionon wrong-length payloads instead of returning aLong.MIN_VALUEsentinel that would collide with a legitimately round-trippable value.ActiveKeyCountInvalidationReasonnow implementsVeniceDimensionInterfaceand is wired as a dimension on the existingingestion.key.active_count_invalidationmetric. Three corruption variants were renamed for clarity:CORRUPT_KCS_SIGNAL_VALUE → CORRUPT_KEY_COUNT_SIGNAL_HEADER_VALUE,CORRUPT_KCS_HEADER_LENGTH → CORRUPT_KEY_COUNT_SIGNAL_HEADER_LENGTH, and a newCORRUPT_LEADER_KEY_COUNT_HEADER_LENGTHfor malformed lkc. The enum guardsgetMessagevsgetMessage(int)against mismatched overload usage via a constructor-computedtemplateWithExtraDataflag.VeniceWriter.sendHeartbeatgained a 7-arg overload that attaches an optional extra PubSub header. The 6-arg overload still exists and delegates to the 7-arg withnull. Tests migrated to the 7-arg stub; production calls the 7-arg form unconditionally.EmptyPubSubMessageHeadersis promoted to a fresh mutable instance before appending the extra header.PubSubMessageHeaders.VENICE_KEY_COUNT_SIGNAL_HEADER(kcs) andVENICE_LEADER_KEY_COUNT_HEADER(lkc).Code changes
server.active.key.count.replica.consistency.check.enabled(defaultfalse). Effective only whenserver.active.key.count.for.hybrid.store.enabled=trueis also on.REDUNDANT_LOGGING_FILTER; the new HB-chain-stall ERROR re-uses the existing close/EOP error path.Concurrency-Specific Checks
AtomicReferencewithgetAndSetlinearization; the lkc value is captured on SIT before the swap so deferred callbacks don't observe later increments; chain failures propagate to downstream waiters..get(...)atVIEW_WRITER_CLOSE_TIMEOUT_IN_MSso a stuck common-pool callback can't hang ingestion.VeniceConcurrentHashMapfor per-partition state;AtomicReferencefor the chain head).whenCompleteAsynccallback wraps the send in atry/catch (Throwable); the setup is wrapped intry/finallywith acallbackScheduledguard so a registration-time throw still completes the chain head exceptionally.How was this PR tested?
VeniceWriter.sendHeartbeatretained).Coverage highlights:
ActiveKeyCountHeartbeatTest: encode/decode round-trip includingLong.MIN_VALUE/MAX_VALUE;buildLeaderActiveKeyCountHeadergating (check disabled, pre-EOP, untracked, null PCS, happy path);compareLeaderActiveKeyCountOnHeartbeatbranches (disabled, pre-EOP, follower not tracking, header absent, leader sentinel, match, mismatch, corrupt length);sendIngestionHeartbeatToVTordering, upstream-failure propagation, sync send-failure propagation.ActiveKeyCountInvalidationReasonTest: dimension fixture for all 7 enum values.ActiveKeyCountScenarioTest: end-to-end Tehuti+OTel parity for bothrecordActiveKeyCountInvalidationandrecordActiveKeyCountMismatchAcrossReplicas, plus cross-system isolation.PartitionConsumptionStateTest: 32-thread chain atomicity test forswapLastVTProduceCallFuture; null-arg NPE test.LeaderFollowerStoreIngestionTaskTest: bounded EOP-wait timeout, empty-view-writers + pending chain head short-circuit, view-writer/CM-write chain-failure-propagation tests.IngestionOtelMetricEntityTest/IngestionOtelStatsTest/ServerMetricEntityTest/VeniceMetricsDimensionsTest: updated for the new dimension and the new mismatch metric entity.Does this PR introduce any user-facing or breaking changes?
The feature is gated behind a new config that defaults to
false. With the flag off, behavior is identical to before the PR (the only observable change is the rename of three internal enum constants on the existing invalidation metric's new dimension, and the new dimension itself only appears whenisActiveKeyCountForHybridStoreEnabledoperators consume that metric).