Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b85fcc6
[server][venice-common] Leader handover: emit graceful-EOS marker on …
sushantmane May 13, 2026
a9671ad
[server][da-vinci] Leader handover: consume the graceful-EOS marker t…
sushantmane May 13, 2026
75b32fa
[server][venice-common] Replace graceful-EOS field with DoL-style Leader
sushantmane May 13, 2026
33c693a
[server] Leader step-down stamp: split produce vs ack-wait, restore i…
sushantmane May 13, 2026
f48d1db
Merge updated PR #2793 (Leader Step-Down stamp redesign) into PR #2794
sushantmane May 13, 2026
1569ddf
[test] Cover emitLeaderStepDownStampIfEnabled in L->Standby path
sushantmane May 13, 2026
1abdaa9
[test] Cover emitLeaderStepDownStampIfEnabled in L->Standby path
sushantmane May 13, 2026
5f52ec8
[server] Address review comments: DIV bypass, sentinel,
sushantmane May 14, 2026
cb83b93
Merge updated PR #2793 review fixes into PR #2794
sushantmane May 14, 2026
d6eae8b
Merge origin/main into PR #2793 to clear conflicts
sushantmane May 14, 2026
ff00c6c
Merge updated PR #2793 (with main merged in) into PR #2794
sushantmane May 14, 2026
ce7f4a5
[server] Emit-side defaults: emit flag off, ack timeout 1s, doc
sushantmane May 14, 2026
a6f863a
Merge PR #2793 emit-default fixes into PR #2794
sushantmane May 14, 2026
05183ee
[server] Gate observeRecordForLeaderHandover on consume flag (hot-pat…
sushantmane May 14, 2026
8a6f7bc
[test] Cover emit-side exception paths + already-STANDBY early return
sushantmane May 15, 2026
a6dd54c
Merge PR #2793 coverage tests into PR #2794
sushantmane May 15, 2026
4930037
[test] Direct unit tests for consume-side stepDownStampObserved +
sushantmane May 15, 2026
078eb22
[server] Clamp step-down stamp ack timeout to non-negative at config-…
sushantmane May 15, 2026
df34a2b
Merge PR #2793 ack-timeout clamp into PR #2794
sushantmane May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
Expand Down Expand Up @@ -712,6 +715,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean useCheckpointedPubSubPositionWithFallback;
private final boolean leaderHandoverUseDoLMechanismForSystemStores;
private final boolean leaderHandoverUseDoLMechanismForUserStores;
private final boolean leaderHandoverEmitStepDownStamp;
private final boolean leaderHandoverConsumeStepDownStamp;
private final long leaderHandoverEmitStepDownStampAckTimeoutMs;
private final LogContext logContext;
private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy;

Expand Down Expand Up @@ -1268,6 +1274,12 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, true);
this.leaderHandoverUseDoLMechanismForUserStores =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, true);
this.leaderHandoverEmitStepDownStamp =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP, true);
this.leaderHandoverConsumeStepDownStamp =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP, false);
this.leaderHandoverEmitStepDownStampAckTimeoutMs =
serverProperties.getLong(SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS, 5000L);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
this.parallelResourceShutdownEnabled =
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
Expand Down Expand Up @@ -2286,6 +2298,18 @@ public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() {
return this.leaderHandoverUseDoLMechanismForUserStores;
}

public boolean isLeaderHandoverEmitStepDownStampEnabled() {
return this.leaderHandoverEmitStepDownStamp;
}

public boolean isLeaderHandoverConsumeStepDownStampEnabled() {
return this.leaderHandoverConsumeStepDownStamp;
}

public long getLeaderHandoverEmitStepDownStampAckTimeoutMs() {
return this.leaderHandoverEmitStepDownStampAckTimeoutMs;
}

public int getServerIngestionInfoLogLineLimit() {
return this.serverIngestionInfoLogLineLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
"State transition from STANDBY to LEADER is paused for replica: {} as this store is undergoing migration",
partitionConsumptionState.getReplicaId());
} else {
// Track the new leader's term so that on the next LEADER -> STANDBY transition we can
// emit a Leader Step-Down stamp tagged with this term, and so that the consume-side
// fast-path check can compare observed stamp termIds against "my term".
partitionConsumptionState.setCurrentLeaderTermId(checker.getLeadershipTerm());

// Initialize DoL state and send DoL stamp to local VT
initializeAndSendDoLStamp(partitionConsumptionState, checker.getLeadershipTerm());

Expand Down Expand Up @@ -600,8 +605,16 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef =
partitionConsumptionState.getVeniceWriterLazyRef();
if (veniceWriterLazyRef != null) {
veniceWriterLazyRef.ifPresent(vw -> vw.closePartition(partition));
veniceWriterLazyRef.ifPresent(vw -> {
// closePartition first - this writes a regular EndOfSegment closing any open segment for
// this leader's producer, draining and ending the data segment cleanly. Then the
// step-down stamp goes onto VT as the last (and self-contained, DoL-style) record for
// this term. The new leader's tail check looks for the stamp.
vw.closePartition(partition);
emitLeaderStepDownStampIfEnabled(vw, partitionConsumptionState, partition);
});
}
partitionConsumptionState.clearCurrentLeaderTermId();
Comment thread
sushantmane marked this conversation as resolved.
Outdated
break;
default:
processCommonConsumerAction(message);
Expand Down Expand Up @@ -1112,6 +1125,87 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum
}
}

/**
* Emit a Leader Step-Down Stamp on the local VT partition during a Leader -> Standby (or
* Leader -> Offline) transition, behind
* {@link com.linkedin.venice.ConfigKeys#SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP}.
*
* <p>The stamp is a DoL-style self-contained control message
* ({@link com.linkedin.venice.writer.VeniceWriter#sendLeaderStepDownStamp}). It uses its own
* producer GUID and fixed {@code (segmentNumber=0, sequenceNumber=0)}, so it is always
* emittable regardless of whether the leader has an open segment for this partition - it does
* not race against straggler writes on the leader's own segment, and it is not lost when the
* leader had nothing to flush.
*
* <p>Stamped with the demoting leader's {@code currentLeaderTermId} (recorded at promotion
* time) via the {@code LeaderMetadata.termId} footer, so the next leader's term-inequality
* check can identify it.
*
* <p>This method is intentionally best-effort: any failure to land the stamp is logged and
* swallowed. If the stamp never lands, the next leader will fall back to the legacy 5-minute
* inactivity wait - which is the pre-change behavior.
*/
private void emitLeaderStepDownStampIfEnabled(
VeniceWriter<byte[], byte[], byte[]> vw,
PartitionConsumptionState pcs,
int partition) {
if (!serverConfig.isLeaderHandoverEmitStepDownStampEnabled()) {
return;
}
long term = pcs.getCurrentLeaderTermId();
if (term <= 0) {
LOGGER.debug(
"Skipping Leader Step-Down stamp for replica: {} partition: {} - no leadership term recorded.",
pcs.getReplicaId(),
partition);
return;
}
CompletableFuture<PubSubProduceResult> ackFuture;
try {
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
ackFuture = vw.sendLeaderStepDownStamp(topicPartition, null, term, localKafkaClusterId);
} catch (Exception e) {
LOGGER.warn(
"Failed to emit Leader Step-Down stamp for replica: {} term: {}. New leader will fall back to legacy 5-minute wait.",
pcs.getReplicaId(),
term,
e);
hostLevelIngestionStats.recordLeaderStepdownStampEmitFailure();
return;
}

long ackTimeoutMs = serverConfig.getLeaderHandoverEmitStepDownStampAckTimeoutMs();
try {
ackFuture.get(ackTimeoutMs, TimeUnit.MILLISECONDS);
LOGGER.info("Emitted Leader Step-Down stamp for replica: {} term: {} (ack received).", pcs.getReplicaId(), term);
hostLevelIngestionStats.recordLeaderStepdownStampEmitSuccess();
} catch (InterruptedException ie) {
// Restore interrupt status and exit the wait path: caller (Helix state transition / shutdown)
// will observe the interrupt. The stamp may or may not have landed; the new leader simply
// falls back to the legacy 5-minute wait if it does not see it.
Thread.currentThread().interrupt();
LOGGER.warn(
"Interrupted while waiting for Leader Step-Down stamp ack for replica: {} term: {}. Restoring interrupt status.",
pcs.getReplicaId(),
term);
hostLevelIngestionStats.recordLeaderStepdownStampEmitFailure();
} catch (TimeoutException te) {
LOGGER.warn(
"Leader Step-Down stamp emitted for replica: {} term: {} but ack not received within {} ms - new leader will fall back to legacy wait.",
pcs.getReplicaId(),
term,
ackTimeoutMs);
hostLevelIngestionStats.recordLeaderStepdownStampEmitFailure();
} catch (java.util.concurrent.ExecutionException ee) {
LOGGER.warn(
"Leader Step-Down stamp produce failed for replica: {} term: {}. New leader will fall back to legacy 5-minute wait.",
pcs.getReplicaId(),
term,
ee.getCause() != null ? ee.getCause() : ee);
hostLevelIngestionStats.recordLeaderStepdownStampEmitFailure();
}
Comment thread
sushantmane marked this conversation as resolved.
}
Comment thread
sushantmane marked this conversation as resolved.

/**
* Callback to handle DoL message produce completion.
* Package-private for testing.
Expand Down Expand Up @@ -1208,9 +1302,17 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
// DoL is ready - for non-system stores, also verify legacy time-based checks as an additional safety layer.
// Rationale: DoL is a new mechanism, so we keep the legacy check as a safety net during initial rollout
// for user stores (which contain customer data). System stores skip this extra check because they're
// critical for cluster operation and we want faster leader transitions there. Once DoL is proven stable
// in production, this extra check can be removed to get the full performance benefit.
if (!isSystemStore && !canSwitchToLeaderTopicLegacy(pcs)) {
// critical for cluster operation and we want faster leader transitions there.
//
// Fast-path bypass: when the consume-side step-down stamp flag is enabled and we have
// observed a Leader Step-Down Stamp from a strictly earlier term at the tail of local VT,
// treat the legacy safety net as satisfied. This collapses the 5-minute inactivity wait
// to ~0 in the common (cooperative) handover case. The check is conservative: any
// subsequent non-DoL, non-self record on VT would have cleared the stamp, so we only
// fast-path when the previous leader's last act on this partition was the stamp.
boolean legacyOk = canSwitchToLeaderTopicLegacy(pcs);
boolean stampFastPath = !legacyOk && stepDownStampObserved(pcs);
if (!isSystemStore && !legacyOk && !stampFastPath) {
LOGGER.debug(
"DoL mechanism complete for replica: {} but legacy time-based check not yet satisfied. Waiting for legacy check to pass.",
pcs.getReplicaId());
Expand All @@ -1224,6 +1326,11 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
pcs.getReplicaId(),
dolLatencyMs,
dolStamp);
if (stampFastPath) {
hostLevelIngestionStats.recordLeaderHandoverFastPath();
} else if (!isSystemStore) {
hostLevelIngestionStats.recordLeaderHandoverLegacyWait();
}
pcs.clearDolState();
return true;
} else {
Expand All @@ -1239,6 +1346,94 @@ private boolean canSwitchToLeaderTopic(PartitionConsumptionState pcs) {
}
}

/**
* Returns whether the new leader has observed a Leader Step-Down Stamp
* ({@link KafkaKey#LEADER_STEPDOWN_STAMP}) from a strictly earlier term at the tail of local
* VT. When true, the legacy 5-minute inactivity wait can be safely bypassed because the
* previous leader has explicitly signaled that it has finished producing for its term.
*
* <p>The check is layered:
* <ul>
* <li><b>Feature flag:</b> {@code server.leader.handover.consume.stepdown.stamp} must be on.</li>
* <li><b>Semantic:</b> the most recently observed non-self record on local VT must be a
* step-down stamp. The marker is invalidated by {@link #observeRecordForLeaderHandover} as
* soon as any non-DoL, non-stamp record is consumed after it, so a stale stamp cannot be
* replayed against a later transition.</li>
* <li><b>Identity (term-based):</b> the stamp's {@code LeaderMetadata.termId} must be
* strictly less than this replica's own current term, which is what {@code currentLeaderTermId}
* carries during STANDBY -> LEADER processing.</li>
* </ul>
*/
private boolean stepDownStampObserved(PartitionConsumptionState pcs) {
if (!serverConfig.isLeaderHandoverConsumeStepDownStampEnabled()) {
return false;
}
if (!pcs.isLastObservedNonSelfEosGraceful()) {
return false;
}
long observedTerm = pcs.getLastObservedNonSelfEosTermId();
long myTerm = pcs.getCurrentLeaderTermId();
if (myTerm <= 0 || observedTerm <= 0 || observedTerm >= myTerm) {
LOGGER.debug(
"Step-down stamp observed for replica: {} but term inequality not satisfied: my term = {}, observed term = {}",
pcs.getReplicaId(),
myTerm,
observedTerm);
return false;
}
LOGGER.info(
"Leader Step-Down stamp observed for replica: {} - skipping legacy 5-minute wait. My term: {}, observed stamp term: {}",
pcs.getReplicaId(),
myTerm,
observedTerm);
return true;
}

@Override
protected void observeRecordForLeaderHandover(
DefaultPubSubMessage consumerRecord,
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaValue,
ControlMessage controlMessage,
PartitionConsumptionState pcs) {
// Only observe records consumed from the local VT - stamps on RT or remote VT are not part
// of the local-VT-tail handshake we are trying to detect.
if (!versionTopic.equals(consumerRecord.getTopicPartition().getPubSubTopic())) {
return;
}
long myTerm = pcs.getCurrentLeaderTermId();
long recordTermId = kafkaValue.leaderMetadataFooter != null ? kafkaValue.leaderMetadataFooter.termId : -1L;

// Identify the Leader Step-Down Stamp by its dedicated KafkaKey. Reuse the
// "lastObservedNonSelfEos*" PCS slots (semantically: "last observed step-down stamp from
// another term") to avoid widening the PCS surface area.
if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.LEADER_STEPDOWN_STAMP.getKey())) {
if (recordTermId == myTerm) {
// Defensive guard: a step-down stamp with my own term should never happen.
return;
}
pcs.recordObservedNonSelfEos(recordTermId, true);
LOGGER.debug(
"Recorded Leader Step-Down stamp on local VT for replica: {}: termId={}",
pcs.getReplicaId(),
recordTermId);
return;
}

// Any other record observed on local VT invalidates a previously recorded stamp, unless
// it is this replica's own DoL stamp loopback (identified by KafkaKey.DOL_STAMP), or any
// self-produced record (termId == myTerm).
if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.DOL_STAMP.getKey())) {
return;
}
if (recordTermId == myTerm && myTerm > 0) {
return;
}
if (pcs.isLastObservedNonSelfEosGraceful() || pcs.getLastObservedNonSelfEosTermId() != -1L) {
pcs.clearObservedNonSelfEos();
}
}
Comment thread
sushantmane marked this conversation as resolved.

/**
* Legacy mechanism for determining when a replica can switch to consuming from the leader (RT) topic.
* Uses time-based waiting and special handling for user system stores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,37 @@ enum LatchStatus {
*/
private volatile long highestLeadershipTerm = -1;

/**
* The leadership term assigned to this replica when it most recently transitioned
* STANDBY -> LEADER, sourced from the Helix message create-timestamp at promotion.
* Set to -1 whenever this replica is not currently a leader. Used by:
* <ul>
* <li>The graceful-EOS emit path on LEADER -> STANDBY/OFFLINE — recovers the
* term-being-closed to stamp on the EndOfSegment's LeaderMetadata footer.</li>
* <li>The consume-side fast-path check on the new leader — used as the
* strictly-greater-than reference when comparing against an observed graceful
* EOS's termId.</li>
* </ul>
Comment thread
sushantmane marked this conversation as resolved.
*/
private volatile long currentLeaderTermId = -1;

/**
* Tracks whether the most recently observed non-self {@code EndOfSegment} on this
* partition's local VT was marked as a graceful leadership handoff. Updated
* whenever a CONTROL_MESSAGE of type END_OF_SEGMENT with a {@code termId} different
* from {@link #currentLeaderTermId} is processed. Cleared back to {@code false}
* whenever any subsequent non-self, non-DoL data record is processed, so that a
* stale graceful EOS cannot be replayed against a later transition.
*/
private volatile boolean lastObservedNonSelfEosGraceful = false;

/**
* The {@code LeaderMetadata.termId} of the most recently observed non-self
* {@code EndOfSegment} on this partition's local VT. Set to -1 when no such EOS
* has been seen, or when invalidated by a subsequent data record.
*/
private volatile long lastObservedNonSelfEosTermId = -1;
Comment thread
sushantmane marked this conversation as resolved.
Outdated

/**
* This future is completed in drainer thread after persisting the associated record and offset to DB.
*/
Expand Down Expand Up @@ -880,6 +911,46 @@ public void setHighestLeadershipTerm(long term) {
this.highestLeadershipTerm = term;
}

public long getCurrentLeaderTermId() {
return this.currentLeaderTermId;
}

public void setCurrentLeaderTermId(long term) {
this.currentLeaderTermId = term;
}

public void clearCurrentLeaderTermId() {
this.currentLeaderTermId = -1;
}

public boolean isLastObservedNonSelfEosGraceful() {
return this.lastObservedNonSelfEosGraceful;
}

public long getLastObservedNonSelfEosTermId() {
return this.lastObservedNonSelfEosTermId;
}

/**
* Record an {@code EndOfSegment} that was just consumed on the local VT for this partition.
* Call only when the EOS's termId differs from this replica's {@link #currentLeaderTermId}
* (i.e. the EOS was authored by some other leader's term).
*/
public void recordObservedNonSelfEos(long termId, boolean gracefulLeadershipHandoff) {
this.lastObservedNonSelfEosTermId = termId;
this.lastObservedNonSelfEosGraceful = gracefulLeadershipHandoff;
}

/**
* Invalidate any previously recorded graceful EOS marker. Call when a non-self, non-DoL data
* record lands at the VT tail after the last observed graceful EOS, so that a stale marker
* cannot be replayed against a later transition.
*/
public void clearObservedNonSelfEos() {
this.lastObservedNonSelfEosGraceful = false;
this.lastObservedNonSelfEosTermId = -1;
}

public void setLastLeaderPersistFuture(Future<Void> future) {
this.lastLeaderPersistFuture = future;
}
Expand Down
Loading