Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b85fcc6
[server][venice-common] Leader handover: emit graceful-EOS marker on …
sushantmane May 13, 2026
a9671ad
[server][da-vinci] Leader handover: consume the graceful-EOS marker t…
sushantmane May 13, 2026
75b32fa
[server][venice-common] Replace graceful-EOS field with DoL-style Leader
sushantmane May 13, 2026
33c693a
[server] Leader step-down stamp: split produce vs ack-wait, restore i…
sushantmane May 13, 2026
f48d1db
Merge updated PR #2793 (Leader Step-Down stamp redesign) into PR #2794
sushantmane May 13, 2026
1569ddf
[test] Cover emitLeaderStepDownStampIfEnabled in L->Standby path
sushantmane May 13, 2026
1abdaa9
[test] Cover emitLeaderStepDownStampIfEnabled in L->Standby path
sushantmane May 13, 2026
5f52ec8
[server] Address review comments: DIV bypass, sentinel,
sushantmane May 14, 2026
cb83b93
Merge updated PR #2793 review fixes into PR #2794
sushantmane May 14, 2026
d6eae8b
Merge origin/main into PR #2793 to clear conflicts
sushantmane May 14, 2026
ff00c6c
Merge updated PR #2793 (with main merged in) into PR #2794
sushantmane May 14, 2026
ce7f4a5
[server] Emit-side defaults: emit flag off, ack timeout 1s, doc
sushantmane May 14, 2026
a6f863a
Merge PR #2793 emit-default fixes into PR #2794
sushantmane May 14, 2026
05183ee
[server] Gate observeRecordForLeaderHandover on consume flag (hot-pat…
sushantmane May 14, 2026
8a6f7bc
[test] Cover emit-side exception paths + already-STANDBY early return
sushantmane May 15, 2026
a6dd54c
Merge PR #2793 coverage tests into PR #2794
sushantmane May 15, 2026
4930037
[test] Direct unit tests for consume-side stepDownStampObserved +
sushantmane May 15, 2026
078eb22
[server] Clamp step-down stamp ack timeout to non-negative at config-…
sushantmane May 15, 2026
df34a2b
Merge PR #2793 ack-timeout clamp into PR #2794
sushantmane May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
Expand Down Expand Up @@ -714,6 +717,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean useCheckpointedPubSubPositionWithFallback;
private final boolean leaderHandoverUseDoLMechanismForSystemStores;
private final boolean leaderHandoverUseDoLMechanismForUserStores;
private final boolean leaderHandoverEmitStepDownStamp;
private final boolean leaderHandoverConsumeStepDownStamp;
private final long leaderHandoverEmitStepDownStampAckTimeoutMs;
private final LogContext logContext;
private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy;

Expand Down Expand Up @@ -1272,6 +1278,12 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, true);
this.leaderHandoverUseDoLMechanismForUserStores =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, true);
this.leaderHandoverEmitStepDownStamp =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP, false);
this.leaderHandoverConsumeStepDownStamp =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP, false);
this.leaderHandoverEmitStepDownStampAckTimeoutMs =
serverProperties.getLong(SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS, 1000L);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
this.parallelResourceShutdownEnabled =
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
Expand Down Expand Up @@ -2294,6 +2306,18 @@ public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() {
return this.leaderHandoverUseDoLMechanismForUserStores;
}

public boolean isLeaderHandoverEmitStepDownStampEnabled() {
return this.leaderHandoverEmitStepDownStamp;
}

public boolean isLeaderHandoverConsumeStepDownStampEnabled() {
return this.leaderHandoverConsumeStepDownStamp;
}

public long getLeaderHandoverEmitStepDownStampAckTimeoutMs() {
return this.leaderHandoverEmitStepDownStampAckTimeoutMs;
}

public int getServerIngestionInfoLogLineLimit() {
return this.serverIngestionInfoLogLineLimit;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,46 @@ enum LatchStatus {
*/
private volatile long highestLeadershipTerm = -1;

/**
* The leadership term assigned to this replica when it most recently transitioned
* STANDBY -> LEADER. Sourced from the Helix state-transition message create-timestamp,
* matching the convention already used by the existing DoL stamp (see
* {@code LeaderFollowerPartitionStateModel#onBecomeLeaderFromStandby}). Set to -1 whenever
* this replica is not currently a leader. Used by:
* <ul>
* <li>The Leader Step-Down Stamp emit path on LEADER -> STANDBY/OFFLINE — copied into
* {@code LeaderMetadata.termId} of the stamp so the next leader can compare against
* its own term.</li>
* <li>The consume-side fast-path check on the new leader — used as the
* strictly-greater-than reference when comparing against an observed stamp's
* {@code LeaderMetadata.termId}.</li>
* </ul>
Comment thread
sushantmane marked this conversation as resolved.
* Caveat: the Helix create-timestamp is a wall-clock value and is not strictly monotonic
* across the replica fleet. Two leaders promoted within the same millisecond — or under
* clock skew — could produce indistinguishable terms. Acceptable for the cooperative-only
* fast path; the legacy 5-minute wait is the fallback whenever the term inequality cannot
* be established. Full {@code termId}-based fencing is the structural fix tracked
* separately.
*/
private volatile long currentLeaderTermId = -1;

/**
* The {@code LeaderMetadata.termId} of the most recently observed Leader Step-Down Stamp
* ({@link com.linkedin.venice.message.KafkaKey#LEADER_STEPDOWN_STAMP}) at the tail of this
* partition's local VT.
* <ul>
* <li>{@code -1}: no stamp observed, or the most recent stamp was invalidated by a
* subsequent non-DoL, non-self record on local VT.</li>
* <li>{@code >= 0}: the {@code termId} carried by the latest observed stamp; the
* consume-side fast-path check compares this against {@link #currentLeaderTermId}.</li>
* </ul>
* Filled by the drainer when an incoming record on local VT has
* {@code KafkaKey == LEADER_STEPDOWN_STAMP} (which can only happen via a deliberate
* step-down emit, so the stamp's presence is itself the signal &mdash; no separate
* "graceful" flag is needed).
*/
private volatile long lastObservedNonSelfStepDownStampTermId = -1;

/**
* This future is completed in drainer thread after persisting the associated record and offset to DB.
*/
Expand Down Expand Up @@ -903,6 +943,50 @@ public void setHighestLeadershipTerm(long term) {
this.highestLeadershipTerm = term;
}

public long getCurrentLeaderTermId() {
return this.currentLeaderTermId;
}

public void setCurrentLeaderTermId(long term) {
this.currentLeaderTermId = term;
}

public void clearCurrentLeaderTermId() {
this.currentLeaderTermId = -1;
}

/**
* @return {@code true} if a Leader Step-Down Stamp from another leader's term has been
* observed at the tail of local VT and has not been invalidated by a subsequent
* non-DoL, non-self record. Equivalent to
* {@link #getLastObservedNonSelfStepDownStampTermId()} {@code != -1}.
*/
public boolean hasObservedNonSelfStepDownStamp() {
return this.lastObservedNonSelfStepDownStampTermId != -1L;
}

public long getLastObservedNonSelfStepDownStampTermId() {
return this.lastObservedNonSelfStepDownStampTermId;
}

/**
* Record that a Leader Step-Down Stamp was just consumed on the local VT for this partition.
* Call only when the stamp's {@code termId} differs from {@link #currentLeaderTermId} (i.e.
* the stamp was authored by some other leader's term).
*/
public void recordObservedNonSelfStepDownStamp(long termId) {
this.lastObservedNonSelfStepDownStampTermId = termId;
}

/**
* Invalidate any previously recorded stamp. Call when a non-self, non-DoL record lands at
* the VT tail after the last observed stamp, so that a stale marker cannot be replayed
* against a later transition.
*/
public void clearObservedNonSelfStepDownStamp() {
this.lastObservedNonSelfStepDownStampTermId = -1L;
}

public void setLastLeaderPersistFuture(Future<Void> future) {
this.lastLeaderPersistFuture = future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4343,6 +4343,7 @@ private int internalProcessConsumerRecord(
consumerRecord.getPubSubMessageTime(),
partitionConsumptionState,
consumerRecord.getPubSubMessageHeaders());
observeRecordForLeaderHandover(consumerRecord, kafkaKey, kafkaValue, controlMessage, partitionConsumptionState);
try {
if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue()) {
if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
Expand Down Expand Up @@ -4372,6 +4373,7 @@ private int internalProcessConsumerRecord(
partitionConsumptionState,
leaderProducedRecordContext,
currentTimeMs);
observeRecordForLeaderHandover(consumerRecord, kafkaKey, kafkaValue, null, partitionConsumptionState);
}
if (recordLevelMetricEnabled.get()) {
versionedIngestionStats.recordConsumedRecordEndToEndProcessingLatency(
Expand Down Expand Up @@ -4541,8 +4543,14 @@ protected void validateMessage(
boolean tolerateMissingMessagesForRealTimeTopic) {
KafkaKey key = consumerRecord.getKey();
if (key.isControlMessage() && (Arrays.equals(KafkaKey.HEART_BEAT.getKey(), key.getKey())
|| Arrays.equals(KafkaKey.DOL_STAMP.getKey(), key.getKey()))) {
return; // Skip validation for ingestion heartbeat and DoL stamp records.
|| Arrays.equals(KafkaKey.DOL_STAMP.getKey(), key.getKey())
|| Arrays.equals(KafkaKey.LEADER_STEPDOWN_STAMP.getKey(), key.getKey()))) {
/*
* Skip DIV for self-contained control stamps. Heartbeat / DoL / Leader Step-Down stamps
* each use a dedicated type-3 producer GUID with segmentNumber=0, sequenceNumber=0 and do
* not participate in the per-segment DIV chain.
*/
return;
} else if (isGlobalRtDivEnabled() && isRecordSelfProduced(consumerRecord)) {
// Skip validation for self-produced records. If there were any issues, the followers would've reported it already
// e.g. Leader->Follower, resubscribe to local VT, consume messages produced by itself (when it was leader)
Expand Down Expand Up @@ -6613,6 +6621,29 @@ protected boolean shouldUseDolMechanism() {
: serverConfig.isLeaderHandoverUseDoLMechanismEnabledForUserStores();
}

/**
* Hook invoked after a consumed record is processed during normal ingestion. Default
* implementation is a no-op; {@link LeaderFollowerStoreIngestionTask} overrides this to feed
* the consume-side leader-handover fast-path bookkeeping on {@link PartitionConsumptionState}.
*
* @param consumerRecord the consumer record that was just processed (includes source topic /
* partition / position used to scope the observation to the local VT only).
* @param kafkaKey the record's KafkaKey (control / data, plus its control-message subtype tag
* such as {@code DOL_STAMP} or {@code HEART_BEAT}).
* @param kafkaValue the record's KafkaMessageEnvelope (footer carries the producer's termId).
* @param controlMessage the parsed ControlMessage when {@code kafkaKey.isControlMessage()};
* {@code null} for data records.
* @param pcs this partition's consumption state.
*/
protected void observeRecordForLeaderHandover(
DefaultPubSubMessage consumerRecord,
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaValue,
ControlMessage controlMessage,
PartitionConsumptionState pcs) {
// no-op for non-LeaderFollower ingestion tasks
}

AbstractStoreBufferService getStoreBufferService() {
return storeBufferService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
private final LongAdderRateGauge batchProcessingRequestRecordsSensor;
private final Sensor batchProcessingRequestLatencySensor;
private final LongAdderRateGauge batchProcessingRequestErrorSensor;
private final Sensor leaderHandoverFastPathSensor;
private final Sensor leaderHandoverLegacyWaitSensor;
private final Sensor leaderStepdownStampEmitSuccessSensor;
private final Sensor leaderStepdownStampEmitFailureSensor;

/**
* @param totalStats the total stats singleton instance, or null if we are constructing the total stats
Expand Down Expand Up @@ -388,6 +392,30 @@ public HostLevelIngestionStats(
() -> totalStats.resubscriptionFailureSensor,
new Count());

this.leaderHandoverFastPathSensor = registerPerStoreAndTotalSensor(
"leader_handover_fast_path",
totalStats,
() -> totalStats.leaderHandoverFastPathSensor,
new Count());

this.leaderHandoverLegacyWaitSensor = registerPerStoreAndTotalSensor(
"leader_handover_legacy_wait",
totalStats,
() -> totalStats.leaderHandoverLegacyWaitSensor,
new Count());

this.leaderStepdownStampEmitSuccessSensor = registerPerStoreAndTotalSensor(
"leader_stepdown_stamp_emit_success",
totalStats,
() -> totalStats.leaderStepdownStampEmitSuccessSensor,
new Count());

this.leaderStepdownStampEmitFailureSensor = registerPerStoreAndTotalSensor(
"leader_stepdown_stamp_emit_failure",
totalStats,
() -> totalStats.leaderStepdownStampEmitFailureSensor,
new Count());

this.leaderProducerSynchronizeLatencySensor = registerPerStoreAndTotalSensor(
"leader_producer_synchronize_latency",
totalStats,
Expand Down Expand Up @@ -652,6 +680,22 @@ public void recordResubscriptionFailure() {
resubscriptionFailureSensor.record();
}

public void recordLeaderHandoverFastPath() {
leaderHandoverFastPathSensor.record();
}

public void recordLeaderHandoverLegacyWait() {
leaderHandoverLegacyWaitSensor.record();
}

public void recordLeaderStepdownStampEmitSuccess() {
leaderStepdownStampEmitSuccessSensor.record();
}

public void recordLeaderStepdownStampEmitFailure() {
leaderStepdownStampEmitFailureSensor.record();
}

public void recordLeaderProducerSynchronizeLatency(double latency) {
leaderProducerSynchronizeLatencySensor.record(latency);
}
Expand Down
Loading
Loading