Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_CONSUME_GRACEFUL_EOS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS_ACK_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
Expand Down Expand Up @@ -712,6 +715,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean useCheckpointedPubSubPositionWithFallback;
private final boolean leaderHandoverUseDoLMechanismForSystemStores;
private final boolean leaderHandoverUseDoLMechanismForUserStores;
private final boolean leaderHandoverEmitGracefulEos;
private final boolean leaderHandoverConsumeGracefulEos;
private final long leaderHandoverEmitGracefulEosAckTimeoutMs;
private final LogContext logContext;
private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy;

Expand Down Expand Up @@ -1268,6 +1274,11 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, true);
this.leaderHandoverUseDoLMechanismForUserStores =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, true);
this.leaderHandoverEmitGracefulEos = serverProperties.getBoolean(SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS, true);
this.leaderHandoverConsumeGracefulEos =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_CONSUME_GRACEFUL_EOS, false);
this.leaderHandoverEmitGracefulEosAckTimeoutMs =
serverProperties.getLong(SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS_ACK_TIMEOUT_MS, 5000L);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
this.parallelResourceShutdownEnabled =
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
Expand Down Expand Up @@ -2286,6 +2297,18 @@ public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() {
return this.leaderHandoverUseDoLMechanismForUserStores;
}

public boolean isLeaderHandoverEmitGracefulEosEnabled() {
return this.leaderHandoverEmitGracefulEos;
}

public boolean isLeaderHandoverConsumeGracefulEosEnabled() {
return this.leaderHandoverConsumeGracefulEos;
}

public long getLeaderHandoverEmitGracefulEosAckTimeoutMs() {
return this.leaderHandoverEmitGracefulEosAckTimeoutMs;
}

public int getServerIngestionInfoLogLineLimit() {
return this.serverIngestionInfoLogLineLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
"State transition from STANDBY to LEADER is paused for replica: {} as this store is undergoing migration",
partitionConsumptionState.getReplicaId());
} else {
// Track the new leader's term so that on the next LEADER -> STANDBY transition we can
// emit a graceful-leadership-handoff EndOfSegment stamped with this term, and so that
Comment thread
sushantmane marked this conversation as resolved.
Outdated
// the consume-side fast-path check can compare observed EOS termIds against "my term".
partitionConsumptionState.setCurrentLeaderTermId(checker.getLeadershipTerm());

// Initialize DoL state and send DoL stamp to local VT
initializeAndSendDoLStamp(partitionConsumptionState, checker.getLeadershipTerm());

Expand Down Expand Up @@ -600,8 +605,12 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef =
partitionConsumptionState.getVeniceWriterLazyRef();
if (veniceWriterLazyRef != null) {
veniceWriterLazyRef.ifPresent(vw -> vw.closePartition(partition));
veniceWriterLazyRef.ifPresent(vw -> {
emitGracefulLeadershipEosIfEnabled(vw, partitionConsumptionState, partition);
vw.closePartition(partition);
});
}
partitionConsumptionState.clearCurrentLeaderTermId();
Comment thread
sushantmane marked this conversation as resolved.
Outdated
break;
default:
processCommonConsumerAction(message);
Expand Down Expand Up @@ -1112,6 +1121,59 @@ private void initializeAndSendDoLStamp(PartitionConsumptionState partitionConsum
}
}

/**
* Emit a graceful-leadership-handoff {@code EndOfSegment} on the local VT partition during a
* Leader -> Standby (or Leader -> Offline) transition, behind
* {@link com.linkedin.venice.ConfigKeys#SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS}. The marker
* is stamped with the demoting leader's {@code currentLeaderTermId} (recorded at promotion
* time) so that the next leader can take the fast path by comparing termIds.
*
* <p>This method is intentionally best-effort: any failure to land the EOS is logged and
* swallowed. If the marker never lands, the next leader will fall back to the legacy
* 5-minute inactivity wait — which is the pre-change behavior.
*/
private void emitGracefulLeadershipEosIfEnabled(
VeniceWriter<byte[], byte[], byte[]> vw,
PartitionConsumptionState pcs,
int partition) {
if (!serverConfig.isLeaderHandoverEmitGracefulEosEnabled()) {
return;
}
long term = pcs.getCurrentLeaderTermId();
if (term <= 0) {
LOGGER.debug(
"Skipping graceful leadership EOS for replica: {} partition: {} - no leadership term recorded.",
pcs.getReplicaId(),
partition);
return;
}
try {
CompletableFuture<PubSubProduceResult> ackFuture =
vw.sendGracefulLeadershipEndOfSegment(partition, term, localKafkaClusterId, null);
long ackTimeoutMs = serverConfig.getLeaderHandoverEmitGracefulEosAckTimeoutMs();
try {
ackFuture.get(ackTimeoutMs, TimeUnit.MILLISECONDS);
LOGGER
.info("Emitted graceful leadership EOS for replica: {} term: {} (ack received).", pcs.getReplicaId(), term);
hostLevelIngestionStats.recordLeaderStepdownGracefulEosEmitSuccess();
Comment thread
sushantmane marked this conversation as resolved.
Outdated
} catch (TimeoutException te) {
LOGGER.warn(
"Graceful leadership EOS emitted for replica: {} term: {} but ack not received within {} ms - new leader will fall back to legacy wait.",
pcs.getReplicaId(),
term,
ackTimeoutMs);
hostLevelIngestionStats.recordLeaderStepdownGracefulEosEmitFailure();
}
} catch (Exception e) {
LOGGER.warn(
"Failed to emit graceful leadership EOS for replica: {} term: {}. New leader will fall back to legacy 5-minute wait.",
pcs.getReplicaId(),
term,
e);
hostLevelIngestionStats.recordLeaderStepdownGracefulEosEmitFailure();
Comment thread
sushantmane marked this conversation as resolved.
Outdated
}
}

/**
* Callback to handle DoL message produce completion.
* Package-private for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ enum LatchStatus {
*/
private volatile long highestLeadershipTerm = -1;

/**
* The leadership term assigned to this replica when it most recently transitioned
* STANDBY -> LEADER, sourced from the Helix message create-timestamp at promotion.
* Set to -1 whenever this replica is not currently a leader. Used by:
* <ul>
* <li>The graceful-EOS emit path on LEADER -> STANDBY/OFFLINE — recovers the
* term-being-closed to stamp on the EndOfSegment's LeaderMetadata footer.</li>
Comment thread
sushantmane marked this conversation as resolved.
Outdated
* <li>The consume-side fast-path check on the new leader — used as the
* strictly-greater-than reference when comparing against an observed graceful
* EOS's termId.</li>
* </ul>
*/
private volatile long currentLeaderTermId = -1;

/**
* This future is completed in drainer thread after persisting the associated record and offset to DB.
*/
Expand Down Expand Up @@ -880,6 +894,18 @@ public void setHighestLeadershipTerm(long term) {
this.highestLeadershipTerm = term;
}

public long getCurrentLeaderTermId() {
return this.currentLeaderTermId;
}

public void setCurrentLeaderTermId(long term) {
this.currentLeaderTermId = term;
}

public void clearCurrentLeaderTermId() {
this.currentLeaderTermId = -1;
}

public void setLastLeaderPersistFuture(Future<Void> future) {
this.lastLeaderPersistFuture = future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
private final LongAdderRateGauge batchProcessingRequestRecordsSensor;
private final Sensor batchProcessingRequestLatencySensor;
private final LongAdderRateGauge batchProcessingRequestErrorSensor;
private final Sensor leaderStepdownGracefulEosEmitSuccessSensor;
private final Sensor leaderStepdownGracefulEosEmitFailureSensor;

/**
* @param totalStats the total stats singleton instance, or null if we are constructing the total stats
Expand Down Expand Up @@ -388,6 +390,18 @@ public HostLevelIngestionStats(
() -> totalStats.resubscriptionFailureSensor,
new Count());

this.leaderStepdownGracefulEosEmitSuccessSensor = registerPerStoreAndTotalSensor(
"leader_stepdown_graceful_eos_emit_success",
totalStats,
() -> totalStats.leaderStepdownGracefulEosEmitSuccessSensor,
new Count());

this.leaderStepdownGracefulEosEmitFailureSensor = registerPerStoreAndTotalSensor(
"leader_stepdown_graceful_eos_emit_failure",
totalStats,
() -> totalStats.leaderStepdownGracefulEosEmitFailureSensor,
new Count());

this.leaderProducerSynchronizeLatencySensor = registerPerStoreAndTotalSensor(
"leader_producer_synchronize_latency",
totalStats,
Expand Down Expand Up @@ -652,6 +666,14 @@ public void recordResubscriptionFailure() {
resubscriptionFailureSensor.record();
}

public void recordLeaderStepdownGracefulEosEmitSuccess() {
leaderStepdownGracefulEosEmitSuccessSensor.record();
}

public void recordLeaderStepdownGracefulEosEmitFailure() {
leaderStepdownGracefulEosEmitFailureSensor.record();
}

public void recordLeaderProducerSynchronizeLatency(double latency) {
leaderProducerSynchronizeLatencySensor.record(latency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,4 +607,23 @@ public void testGetOrCreateCachedHeartbeatKeyResolvesRegularUnchunked() {
assertEquals(k.getChunkingStatus(), VeniceChunkingStatus.UNCHUNKED);
assertEquals(k.getLocality(), VeniceRegionLocality.LOCAL);
}

@Test
public void testCurrentLeaderTermIdLifecycle() {
PartitionConsumptionState pcs = new PartitionConsumptionState(
TOPIC_PARTITION,
mock(OffsetRecord.class),
pubSubContext,
false,
false,
false,
null);
// default
assertEquals(pcs.getCurrentLeaderTermId(), -1L);
pcs.setCurrentLeaderTermId(1000L);
assertEquals(pcs.getCurrentLeaderTermId(), 1000L);
pcs.clearCurrentLeaderTermId();
assertEquals(pcs.getCurrentLeaderTermId(), -1L);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,56 @@ private ConfigKeys() {
public static final String SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES =
"server.leader.handover.use.dol.mechanism.for.user.stores";

/**
* Controls whether a leader replica emits a graceful-leadership-handoff EndOfSegment control
* message to the local version topic (VT) when it receives a Helix Leader -> Standby (or
* Leader -> Offline) state transition.
*
* <p>The marker is a normal {@code EndOfSegment} control message with
* {@code gracefulLeadershipHandoff = true} and {@code finalSegment = true}, carrying the
* outgoing leader's {@code LeaderMetadata.termId}. A new leader (when its consume-side flag is
* on) uses the marker to skip the legacy 5-minute inactivity wait in
* {@code canSwitchToLeaderTopicLegacy}.
*
* <p>Emitting the marker is a no-op for replicas reading without the consume-side flag set, so
* this flag is safe to enable fleet-wide before flipping the consume-side flag.
*
* Default: true (emit the graceful handoff marker on every L -> Standby/Offline transition)
*/
public static final String SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS = "server.leader.handover.emit.graceful.eos";

/**
* Controls whether a new leader replica honors a graceful-leadership-handoff EndOfSegment
* marker (see {@link #SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS}) when deciding whether to skip
* the legacy 5-minute inactivity wait during Standby -> Leader transition.
*
* <p>When enabled, after the DoL loopback completes the new leader checks the most recent
* non-self EndOfSegment it has observed on the local VT. If that EOS has
* {@code gracefulLeadershipHandoff = true} and its {@code LeaderMetadata.termId} is strictly
* less than the new leader's own term, the new leader bypasses the legacy 5-minute wait and
* immediately proceeds to switch to the leader source topic.
*
* <p>If the marker is absent, stale, or from the new leader's own term, the new leader falls
* back to the legacy 5-minute inactivity wait. The fallback is byte-for-byte equivalent to the
* pre-change behavior.
*
* Default: false (legacy 5-minute wait is preserved until the feature is validated)
*/
public static final String SERVER_LEADER_HANDOVER_CONSUME_GRACEFUL_EOS =
"server.leader.handover.consume.graceful.eos";

/**
* Bounded timeout (in milliseconds) the outgoing leader will wait for the broker to ack the
* graceful-leadership-handoff EndOfSegment marker emitted on Leader -> Standby / Offline. After
* the timeout, the leader proceeds with the rest of the demotion sequence regardless. If the
* ack never arrives, the new leader simply will not observe the marker and will fall back to
* the legacy 5-minute wait. Safety is unaffected.
*
* Default: 5000 ms
*/
public static final String SERVER_LEADER_HANDOVER_EMIT_GRACEFUL_EOS_ACK_TIMEOUT_MS =
"server.leader.handover.emit.graceful.eos.ack.timeout.ms";

public static final String SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS =
"server.netty.graceful.shutdown.period.seconds";
public static final String SERVER_NETTY_WORKER_THREADS = "server.netty.worker.threads";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public enum AvroProtocolDefinition {
/**
* Used for the Kafka topics, including the main data topics as well as the admin topic.
*/
KAFKA_MESSAGE_ENVELOPE(23, 14, KafkaMessageEnvelope.class),
KAFKA_MESSAGE_ENVELOPE(23, 15, KafkaMessageEnvelope.class),

/**
* Used to persist the state of a partition in Storage Nodes, including offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,71 @@ public CompletableFuture<PubSubProduceResult> sendDoLStamp(
}
}

/**
* Emits an {@link EndOfSegment} control message marked as a graceful leadership handoff. Called
* by the leader replica when it is voluntarily stepping down (Helix Leader -> Standby or
* Leader -> Offline transition). The message carries:
* <ul>
* <li>{@code gracefulLeadershipHandoff = true} — distinguishes this EOS from a regular
* segment close;</li>
* <li>{@code finalSegment = true} — semantically this is the last segment from this leader's
* session;</li>
* <li>{@link com.linkedin.venice.kafka.protocol.LeaderMetadata#termId} = {@code leadershipTerm}
* — identifies the term being closed, used by the new leader's term-inequality check.</li>
* </ul>
*
* <p>The current segment is marked ended after this method returns, mirroring
* {@link #endSegment(int, boolean)}. If there is no open segment on the partition the call is a
* no-op and the returned future completes with {@code null}.
*
* <p>Failure to land this message (timeout, broker error) is safe: the new leader will simply
* not observe the marker and will fall back to the legacy 5-minute inactivity wait, which is
* the pre-change behavior.
*
* @param partition the partition on which to close the current leader-owned segment
* @param leadershipTerm the termId of the leader that is stepping down, copied into
* {@code LeaderMetadata.termId} of the emitted EOS
* @param localPubSubClusterId the cluster id used as a placeholder for upstream metadata on this
* control message
* @param callback callback invoked when the underlying produce completes
* @return a future for the produce result, or a completed-null future if no segment was open
*/
public CompletableFuture<PubSubProduceResult> sendGracefulLeadershipEndOfSegment(
int partition,
long leadershipTerm,
int localPubSubClusterId,
PubSubProducerCallback callback) {
LeaderMetadataWrapper leaderMetadataWrapper =
new LeaderMetadataWrapper(PubSubSymbolicPosition.EARLIEST, localPubSubClusterId, leadershipTerm);
synchronized (this.partitionLocks[partition]) {
Segment currentSegment = segments[partition];
if (currentSegment == null || !currentSegment.isStarted() || currentSegment.isEnded()) {
logger.info(
"Graceful leadership EOS requested for partition {} (term {}) but no open segment; skipping.",
partition,
leadershipTerm);
return CompletableFuture.completedFuture(null);
}
try {
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageType = ControlMessageType.END_OF_SEGMENT.getValue();
EndOfSegment endOfSegment = new EndOfSegment();
endOfSegment.checksumValue = ByteBuffer.wrap(currentSegment.getFinalCheckSum());
endOfSegment.computedAggregates = Collections.emptyList();
endOfSegment.finalSegment = true;
endOfSegment.gracefulLeadershipHandoff = true;
controlMessage.controlMessageUnion = endOfSegment;
logger.info(
"Emitting graceful leadership handoff EndOfSegment on partition {} for term {}",
partition,
leadershipTerm);
return sendControlMessage(controlMessage, partition, Collections.emptyMap(), callback, leaderMetadataWrapper);
} finally {
currentSegment.end(true);
}
}
}

public static KafkaMessageEnvelope getHeartbeatKME(
long originTimeStampMs,
LeaderMetadataWrapper leaderMetadataWrapper,
Expand Down
Loading
Loading