Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_BASED_REPLICA_AUTO_RESUBSCRIBE_THRESHOLD_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_LAG_MONITOR_CLEANUP_CYCLE;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
Expand Down Expand Up @@ -714,6 +717,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean useCheckpointedPubSubPositionWithFallback;
private final boolean leaderHandoverUseDoLMechanismForSystemStores;
private final boolean leaderHandoverUseDoLMechanismForUserStores;
private final boolean leaderHandoverEmitStepDownStamp;
private final boolean leaderHandoverConsumeStepDownStamp;
private final long leaderHandoverEmitStepDownStampAckTimeoutMs;
private final LogContext logContext;
private final IngestionTaskReusableObjects.Strategy ingestionTaskReusableObjectsStrategy;

Expand Down Expand Up @@ -1272,6 +1278,12 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORES, true);
this.leaderHandoverUseDoLMechanismForUserStores =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES, true);
this.leaderHandoverEmitStepDownStamp =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP, true);
Comment thread
sushantmane marked this conversation as resolved.
Outdated
this.leaderHandoverConsumeStepDownStamp =
serverProperties.getBoolean(SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP, false);
this.leaderHandoverEmitStepDownStampAckTimeoutMs =
serverProperties.getLong(SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS, 5000L);
this.serverIngestionInfoLogLineLimit = serverProperties.getInt(SERVER_INGESTION_INFO_LOG_LINE_LIMIT, 20);
this.parallelResourceShutdownEnabled =
serverProperties.getBoolean(SERVER_PARALLEL_RESOURCE_SHUTDOWN_ENABLED, false);
Expand Down Expand Up @@ -2294,6 +2306,18 @@ public boolean isLeaderHandoverUseDoLMechanismEnabledForUserStores() {
return this.leaderHandoverUseDoLMechanismForUserStores;
}

public boolean isLeaderHandoverEmitStepDownStampEnabled() {
return this.leaderHandoverEmitStepDownStamp;
}

public boolean isLeaderHandoverConsumeStepDownStampEnabled() {
return this.leaderHandoverConsumeStepDownStamp;
}

public long getLeaderHandoverEmitStepDownStampAckTimeoutMs() {
return this.leaderHandoverEmitStepDownStampAckTimeoutMs;
}

public int getServerIngestionInfoLogLineLimit() {
return this.serverIngestionInfoLogLineLimit;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,29 @@ enum LatchStatus {
*/
private volatile long highestLeadershipTerm = -1;

/**
* The leadership term assigned to this replica when it most recently transitioned
* STANDBY -> LEADER. Sourced from the Helix state-transition message create-timestamp,
* matching the convention already used by the existing DoL stamp (see
* {@code LeaderFollowerPartitionStateModel#onBecomeLeaderFromStandby}). Set to -1 whenever
* this replica is not currently a leader. Used by:
* <ul>
* <li>The Leader Step-Down Stamp emit path on LEADER -> STANDBY/OFFLINE — copied into
* {@code LeaderMetadata.termId} of the stamp so the next leader can compare against
* its own term.</li>
* <li>The consume-side fast-path check on the new leader — used as the
* strictly-greater-than reference when comparing against an observed stamp's
* {@code LeaderMetadata.termId}.</li>
* </ul>
* Caveat: the Helix create-timestamp is a wall-clock value and is not strictly monotonic
* across the replica fleet. Two leaders promoted within the same millisecond — or under
* clock skew — could produce indistinguishable terms. Acceptable for the cooperative-only
* fast path; the legacy 5-minute wait is the fallback whenever the term inequality cannot
* be established. Full {@code termId}-based fencing is the structural fix tracked
* separately.
*/
private volatile long currentLeaderTermId = -1;

/**
* This future is completed in drainer thread after persisting the associated record and offset to DB.
*/
Expand Down Expand Up @@ -903,6 +926,18 @@ public void setHighestLeadershipTerm(long term) {
this.highestLeadershipTerm = term;
}

public long getCurrentLeaderTermId() {
return this.currentLeaderTermId;
}

public void setCurrentLeaderTermId(long term) {
this.currentLeaderTermId = term;
}

public void clearCurrentLeaderTermId() {
this.currentLeaderTermId = -1;
}

public void setLastLeaderPersistFuture(Future<Void> future) {
this.lastLeaderPersistFuture = future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4541,8 +4541,14 @@ protected void validateMessage(
boolean tolerateMissingMessagesForRealTimeTopic) {
KafkaKey key = consumerRecord.getKey();
if (key.isControlMessage() && (Arrays.equals(KafkaKey.HEART_BEAT.getKey(), key.getKey())
|| Arrays.equals(KafkaKey.DOL_STAMP.getKey(), key.getKey()))) {
return; // Skip validation for ingestion heartbeat and DoL stamp records.
|| Arrays.equals(KafkaKey.DOL_STAMP.getKey(), key.getKey())
|| Arrays.equals(KafkaKey.LEADER_STEPDOWN_STAMP.getKey(), key.getKey()))) {
/*
* Skip DIV for self-contained control stamps. Heartbeat / DoL / Leader Step-Down stamps
* each use a dedicated type-3 producer GUID with segmentNumber=0, sequenceNumber=0 and do
* not participate in the per-segment DIV chain.
*/
return;
} else if (isGlobalRtDivEnabled() && isRecordSelfProduced(consumerRecord)) {
// Skip validation for self-produced records. If there were any issues, the followers would've reported it already
// e.g. Leader->Follower, resubscribe to local VT, consume messages produced by itself (when it was leader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
private final LongAdderRateGauge batchProcessingRequestRecordsSensor;
private final Sensor batchProcessingRequestLatencySensor;
private final LongAdderRateGauge batchProcessingRequestErrorSensor;
private final Sensor leaderStepdownStampEmitSuccessSensor;
private final Sensor leaderStepdownStampEmitFailureSensor;

/**
* @param totalStats the total stats singleton instance, or null if we are constructing the total stats
Expand Down Expand Up @@ -388,6 +390,18 @@ public HostLevelIngestionStats(
() -> totalStats.resubscriptionFailureSensor,
new Count());

this.leaderStepdownStampEmitSuccessSensor = registerPerStoreAndTotalSensor(
"leader_stepdown_stamp_emit_success",
totalStats,
() -> totalStats.leaderStepdownStampEmitSuccessSensor,
new Count());

this.leaderStepdownStampEmitFailureSensor = registerPerStoreAndTotalSensor(
"leader_stepdown_stamp_emit_failure",
totalStats,
() -> totalStats.leaderStepdownStampEmitFailureSensor,
new Count());

this.leaderProducerSynchronizeLatencySensor = registerPerStoreAndTotalSensor(
"leader_producer_synchronize_latency",
totalStats,
Expand Down Expand Up @@ -652,6 +666,14 @@ public void recordResubscriptionFailure() {
resubscriptionFailureSensor.record();
}

public void recordLeaderStepdownStampEmitSuccess() {
leaderStepdownStampEmitSuccessSensor.record();
}

public void recordLeaderStepdownStampEmitFailure() {
leaderStepdownStampEmitFailureSensor.record();
}

public void recordLeaderProducerSynchronizeLatency(double latency) {
leaderProducerSynchronizeLatencySensor.record(latency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
Expand Down Expand Up @@ -413,6 +416,34 @@ public void testVeniceWriterInProcessConsumerAction() throws InterruptedExceptio
lazyMockWriter.get();
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockWriter, times(1)).closePartition(0);

// case 4: Leader Step-Down stamp emit path (success):
// - emit flag on, leader term recorded, writer returns completed future -> sendLeaderStepDownStamp is invoked.
when(mockVeniceServerConfig.isLeaderHandoverEmitStepDownStampEnabled()).thenReturn(true);
when(mockVeniceServerConfig.getLeaderHandoverEmitStepDownStampAckTimeoutMs()).thenReturn(1000L);
when(mockPartitionConsumptionState.getCurrentLeaderTermId()).thenReturn(123456789L);
when(mockWriter.sendLeaderStepDownStamp(any(PubSubTopicPartition.class), isNull(), anyLong(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(mock(PubSubProduceResult.class)));
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockWriter, times(1))
.sendLeaderStepDownStamp(any(PubSubTopicPartition.class), isNull(), eq(123456789L), anyInt());
// currentLeaderTermId is cleared as part of the L->Standby tail.
verify(mockPartitionConsumptionState, atLeastOnce()).clearCurrentLeaderTermId();

// case 5: Leader Step-Down stamp emit path (timeout): future.get times out -> failure metric, no crash.
CompletableFuture<PubSubProduceResult> stuckFuture = new CompletableFuture<>();
when(mockWriter.sendLeaderStepDownStamp(any(PubSubTopicPartition.class), isNull(), anyLong(), anyInt()))
.thenReturn(stuckFuture);
when(mockVeniceServerConfig.getLeaderHandoverEmitStepDownStampAckTimeoutMs()).thenReturn(10L);
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
// closePartition is invoked again on this transition.
verify(mockWriter, atLeast(2)).closePartition(0);

// case 6: Emit flag off -> sendLeaderStepDownStamp is NOT invoked beyond case 4/5 invocation count.
when(mockVeniceServerConfig.isLeaderHandoverEmitStepDownStampEnabled()).thenReturn(false);
clearInvocations(mockWriter);
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockWriter, times(0)).sendLeaderStepDownStamp(any(), any(), anyLong(), anyInt());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,24 @@ public void testGetOrCreateCachedHeartbeatKeyResolvesRegularUnchunked() {
assertEquals(k.getLocality(), VeniceRegionLocality.LOCAL);
}

@Test
public void testCurrentLeaderTermIdLifecycle() {
PartitionConsumptionState pcs = new PartitionConsumptionState(
TOPIC_PARTITION,
mock(OffsetRecord.class),
pubSubContext,
false,
false,
false,
null);
// default
assertEquals(pcs.getCurrentLeaderTermId(), -1L);
pcs.setCurrentLeaderTermId(1000L);
assertEquals(pcs.getCurrentLeaderTermId(), 1000L);
pcs.clearCurrentLeaderTermId();
assertEquals(pcs.getCurrentLeaderTermId(), -1L);
}

@DataProvider(name = "batchPushRecordCountCases")
public static Object[][] batchPushRecordCountCases() {
// { description, priorCountInOffsetRecord, incrementsAfterConstruction, expectedFinalCount }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,59 @@ private ConfigKeys() {
public static final String SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES =
"server.leader.handover.use.dol.mechanism.for.user.stores";

/**
* Controls whether a leader replica emits a Leader Step-Down Stamp to the local version topic
* (VT) when it receives a Helix Leader -> Standby (or Leader -> Offline) state transition.
*
* <p>The stamp is a DoL-style self-contained control message identified on the wire by
* {@code KafkaKey.LEADER_STEPDOWN_STAMP}, with its own producer GUID and fixed
* {@code (segmentNumber=0, sequenceNumber=0)}. It carries the demoting leader's
* {@code LeaderMetadata.termId}. A new leader (when its consume-side flag is on) uses the
* stamp to skip the legacy 5-minute inactivity wait in {@code canSwitchToLeaderTopicLegacy}.
*
* <p>Self-contained design: the stamp does not depend on the leader's own segment state, so it
* is always emittable regardless of whether the leader had an open segment. This avoids races
* with straggler writes and the "leader had nothing to flush" edge case.
*
* <p>Emitting the stamp is a no-op for replicas reading without the consume-side flag set, so
* this flag is safe to enable fleet-wide before flipping the consume-side flag.
*
* Default: true (emit the step-down stamp on every L -> Standby/Offline transition)
*/
public static final String SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP = "server.leader.handover.emit.stepdown.stamp";
Comment thread
sushantmane marked this conversation as resolved.

/**
* Controls whether a new leader replica honors a Leader Step-Down Stamp (see
* {@link #SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP}) when deciding whether to skip the
* legacy 5-minute inactivity wait during Standby -> Leader transition.
*
* <p>When enabled, after the DoL loopback completes the new leader checks the most recent
* non-self Step-Down Stamp it has observed on the local VT. If the stamp's
* {@code LeaderMetadata.termId} is strictly less than the new leader's own term, the new
* leader bypasses the legacy 5-minute wait and immediately proceeds to switch to the leader
* source topic.
*
* <p>If the stamp is absent, stale, or from the new leader's own term, the new leader falls
* back to the legacy 5-minute inactivity wait. The fallback is byte-for-byte equivalent to the
* pre-change behavior.
*
* Default: false (legacy 5-minute wait is preserved until the feature is validated)
*/
public static final String SERVER_LEADER_HANDOVER_CONSUME_STEPDOWN_STAMP =
"server.leader.handover.consume.stepdown.stamp";

/**
* Bounded timeout (in milliseconds) the outgoing leader will wait for the broker to ack the
* Leader Step-Down Stamp emitted on Leader -> Standby / Offline. After the timeout, the leader
* proceeds with the rest of the demotion sequence regardless. If the ack never arrives, the
* new leader simply will not observe the stamp and will fall back to the legacy 5-minute
* wait. Safety is unaffected.
*
* Default: 5000 ms
*/
public static final String SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS =
"server.leader.handover.emit.stepdown.stamp.ack.timeout.ms";

public static final String SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS =
"server.netty.graceful.shutdown.period.seconds";
public static final String SERVER_NETTY_WORKER_THREADS = "server.netty.worker.threads";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.linkedin.venice.guid;

import com.linkedin.venice.kafka.protocol.GUID;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;


/**
* A GUID generator for Leader Step-Down Stamp control messages.
*
* <p>Uses {@link java.util.UUID#nameUUIDFromBytes(byte[])} to generate a type 3 GUID that will
* not collide with type 4 GUIDs generated for user data using {@link JavaUtilGuidV4Generator},
* nor with the type-3 GUIDs used for heartbeats and Declaration of Leadership stamps.
*
* <p>This GUID identifies the Leader Step-Down message <b>type</b> (not individual messages).
* All Leader Step-Down stamps share the same GUID, which allows receivers to distinguish them
* from data, heartbeats, and DoL stamps. Individual stamps are distinguished by their payload
* (specifically the {@code LeaderMetadata.termId} that identifies the term being closed).
*/
public class LeaderStepDownStampGuidGenerator implements GuidGenerator {
private static LeaderStepDownStampGuidGenerator instance;
private static final GUID LEADER_STEPDOWN_STAMP_GUID;

static {
UUID javaUtilUuid = UUID.nameUUIDFromBytes("leaderStepDownStampControlMessage".getBytes(StandardCharsets.UTF_8));
LEADER_STEPDOWN_STAMP_GUID = new GUID();
byte[] guidBytes = ByteBuffer.allocate(16)
.putLong(javaUtilUuid.getMostSignificantBits())
.putLong(javaUtilUuid.getLeastSignificantBits())
.array();
LEADER_STEPDOWN_STAMP_GUID.bytes(guidBytes);
}

private LeaderStepDownStampGuidGenerator() {
}

public static synchronized LeaderStepDownStampGuidGenerator getInstance() {
if (instance == null) {
instance = new LeaderStepDownStampGuidGenerator();
}
return instance;
}

@Override
public GUID getGuid() {
return LEADER_STEPDOWN_STAMP_GUID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.guid.DoLStampGuidGenerator;
import com.linkedin.venice.guid.HeartbeatGuidV3Generator;
import com.linkedin.venice.guid.LeaderStepDownStampGuidGenerator;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.memory.ClassSizeEstimator;
Expand Down Expand Up @@ -59,6 +60,27 @@ public class KafkaKey implements Measurable {
.putInt(0) // sequence number
.array());

/**
* Special key for Leader Step-Down Stamp control messages.
*
* <p>Emitted by a leader replica during the Helix LEADER -> STANDBY (or LEADER -> OFFLINE)
* transition to advertise that it has voluntarily finished producing for its term. Mirrors the
* shape of {@link #DOL_STAMP}: dedicated producer GUID via
* {@link LeaderStepDownStampGuidGenerator}, {@code segmentNumber = 0}, {@code messageSequenceNumber = 0}.
*
* <p>Self-contained: does not depend on the leader's current segment state, so it is always
* emittable regardless of whether the leader had an open segment. Identified by the receiver
* via this {@code KafkaKey}; the closed term is read from {@code LeaderMetadata.termId} in the
* envelope footer.
*/
public static final KafkaKey LEADER_STEPDOWN_STAMP = new KafkaKey(
Comment thread
sushantmane marked this conversation as resolved.
MessageType.CONTROL_MESSAGE,
ByteBuffer.allocate(CONTROL_MESSAGE_KAFKA_KEY_LENGTH)
.put(LeaderStepDownStampGuidGenerator.getInstance().getGuid().bytes())
.putInt(0) // segment number
.putInt(0) // sequence number
.array());

private final byte keyHeaderByte;
private final byte[] key; // TODO: Consider whether we may want to use a ByteBuffer here

Expand Down
Loading
Loading