Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public abstract CompletableFuture<Void> execSyncOffsetCommandAsync(
PubSubTopicPartition topicPartition,
StoreIngestionTask ingestionTask) throws InterruptedException;

public abstract void execSyncOffsetFromSnapshotAsync(
public abstract CompletableFuture<Void> execSyncGlobalRtDivAsync(
PubSubTopicPartition topicPartition,
StoreIngestionTask ingestionTask) throws InterruptedException;

public abstract CompletableFuture<Void> execSyncOffsetFromSnapshotAsync(
PubSubTopicPartition topicPartition,
PartitionTracker vtDivSnapshot,
CompletableFuture<Void> lastRecordPersistedFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,8 @@ protected void produceToLocalKafka(
try {
if (shouldSendGlobalRtDiv(consumerRecord, pcs, kafkaUrl)) {
sendGlobalRtDivMessage(
consumerRecord,
consumerRecord.getPosition(),
consumerRecord.getTopicPartition(),
pcs,
partition,
kafkaUrl,
Expand Down Expand Up @@ -2411,20 +2412,120 @@ void addVtDivToProducerCallbackIfNeeded(
* Shared by {@link #createGlobalRtDivCallback} (leader-RT-source) and
* {@link #addVtDivToProducerCallbackIfNeeded} (leader-VT-source).
*/
private void sendVtDivSnapshotOnCompletion(
private CompletableFuture<Void> sendVtDivSnapshotOnCompletion(
LeaderProducerCallback callback,
PubSubTopicPartition topicPartition,
PartitionTracker vtDiv,
CompletableFuture<Void> persistedToDBFuture) {
// Relay future the leader graceful-shutdown path awaits. It completes when the drainer-side VT DIV sync node has
// run. The leader-produce callback only fires on produce success, so also fail the relay if the produce/persist
// fails — otherwise the shutdown await would hang until its timeout instead of completing promptly.
CompletableFuture<Void> vtDivSyncedFuture = new CompletableFuture<>();
persistedToDBFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
vtDivSyncedFuture.completeExceptionally(throwable);
}
});
callback.setOnCompletionCallback(produceResult -> {
try {
vtDiv.updateLatestConsumedVtPosition(produceResult.getPubSubPosition());
storeBufferService.execSyncOffsetFromSnapshotAsync(topicPartition, vtDiv, persistedToDBFuture, this);
storeBufferService.execSyncOffsetFromSnapshotAsync(topicPartition, vtDiv, persistedToDBFuture, this)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
vtDivSyncedFuture.completeExceptionally(throwable);
} else {
vtDivSyncedFuture.complete(null);
}
});
} catch (InterruptedException e) {
LOGGER.error("event=globalRtDiv Failed to async VT DIV OffsetRecord sync for replica: {}", topicPartition, e);
Thread.currentThread().interrupt();
vtDivSyncedFuture.completeExceptionally(e);
}
});
return vtDivSyncedFuture;
}

/**
* On-demand flush of both the RT and VT DIV state, invoked at graceful shutdown (see
* {@link StoreIngestionTask#executeShutdownRunnable}). Decoupled from the byte-threshold triggers.
*
* <ul>
* <li><b>Leader:</b> for each RT source broker, produce one {@link GlobalRtDivState} (carrying the latest consumed
* RT position, LCRP) to the local VT via {@link #sendGlobalRtDivMessage}. Its produce-completion callback enqueues
* a single waitable VT DIV + LCVP sync node into the FIFO drainer (carrying the produced LCVP), so the RT produce
* covers both halves; {@code sendGlobalRtDivMessage} returns that node's future, which completes only after the RT
* DIV produce has persisted and the VT DIV sync has run. Brokers whose LCRP is
* {@link PubSubSymbolicPosition#EARLIEST} are skipped (no RT progress yet).</li>
* <li><b>Follower / leader with no RT progress or no RT brokers:</b> force a single waitable VT DIV snapshot sync.
* RT DIV is already durable in the StorageEngine from when the follower consumed {@link GlobalRtDivState}.</li>
* </ul>
*
* @return a future completing once all produces have persisted and the corresponding VT DIV syncs have run.
*/
@Override
protected CompletableFuture<Void> forceGlobalRtDivSync(PartitionConsumptionState pcs) {
int partition = pcs.getPartition();
PubSubTopicPartition localVtTopicPartition = pcs.getReplicaTopicPartition();
List<CompletableFuture<Void>> futures = new ArrayList<>();

if (pcs.getLeaderFollowerState().equals(LEADER)) {
Set<String> realTimeDataSourceKafkaURLs = getRealTimeDataSourceKafkaAddress(pcs);
for (String brokerUrl: realTimeDataSourceKafkaURLs) {
PubSubPosition lcrp = pcs.getLatestConsumedRtPosition(brokerUrl);
// Skip brokers with no RT progress: producing/persisting EARLIEST would force a re-subscribe from EARLIEST on
// restart (mirrors the EARLIEST-LCVP guard in sendVtDivSnapshotIfNeeded).
if (PubSubSymbolicPosition.EARLIEST.equals(lcrp)) {
continue;
}
// Resolve the cluster id from the canonical url->id reverse map (it also carries alias / _sep-topic url
// variants that the forward id->url map lacks), matching how the steady-state RT consume path resolves it.
int kafkaClusterId = getServerConfig().getKafkaClusterUrlToIdMap().getOrDefault(brokerUrl, -1);
try {
// sendGlobalRtDivMessage's produce-completion callback enqueues the waitable VT DIV sync node (carrying the
// produced LCVP) into the FIFO drainer. Await that node's future directly: it completes only after the RT DIV
// produce has persisted and the VT DIV + LCVP have been synced to the OffsetRecord — no second redundant
// sync.
futures.add(
sendGlobalRtDivMessage(
lcrp,
localVtTopicPartition,
pcs,
partition,
brokerUrl,
System.nanoTime(),
kafkaClusterId));
} catch (Exception e) {
LOGGER.error(
"event=globalRtDiv Failed to force Global RT DIV sync for replica: {} broker: {}",
localVtTopicPartition,
brokerUrl,
e);
}
}
}

// Follower, or leader with no RT progress / no RT brokers: force a waitable VT DIV snapshot sync.
if (futures.isEmpty()) {
futures.add(enqueueWaitableVtDivSync(localVtTopicPartition));
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

/**
* Enqueues a waitable Global RT DIV sync on the drainer, returning a future that fails (rather than throwing) if
* interrupted so the graceful-shutdown await never hangs.
*/
private CompletableFuture<Void> enqueueWaitableVtDivSync(PubSubTopicPartition localVtTopicPartition) {
try {
return storeBufferService.execSyncGlobalRtDivAsync(localVtTopicPartition, this);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(e);
return failed;
}
}

@Override
Expand Down Expand Up @@ -4351,21 +4452,24 @@ public byte[] getGlobalRtDivKeyBytes(int partitionId, String brokerUrl) {
* Upon completion, the {@link LeaderProducerCallback} will write the {@link GlobalRtDivState} to the StorageEngine.
* When the drainer receives a Global RT DIV, that is the signal to sync the VT DIV to the OffsetRecord.
* NOTE: This method is called per-broker. The broker url is included in the key.
* @param previousMessage the last RT message that was validated and produced to kafka before this GlobalRtDiv
* will be produced.
* @param previousPosition the latest consumed RT position (LCRP) up to which the RT DIV has been validated;
* serialized into the {@link GlobalRtDivState} and stamped on the produced record.
* @param topicPartition the topic-partition of the record that triggered this produce (used for routing/logging).
* @return a future that completes once the produce has persisted and the chained VT DIV + LCVP sync node has run on
* the drainer; the graceful-shutdown leader path awaits it, while steady-state callers ignore it.
*/
void sendGlobalRtDivMessage(
DefaultPubSubMessage previousMessage,
CompletableFuture<Void> sendGlobalRtDivMessage(
PubSubPosition previousPosition,
PubSubTopicPartition topicPartition,
PartitionConsumptionState pcs,
int partition,
String brokerUrl,
long beforeProcessingRecordTimestampNs,
int kafkaClusterId) {
final byte[] keyBytes = getGlobalRtDivKeyBytes(partition, brokerUrl);
final PubSubTopicPartition topicPartition = previousMessage.getTopicPartition();
TopicType realTimeTopicType = TopicType.of(REALTIME_TOPIC_TYPE, brokerUrl);
LeaderMetadataWrapper leaderMetadataWrapper =
new LeaderMetadataWrapper(previousMessage.getPosition(), kafkaClusterId, DEFAULT_TERM_ID);
new LeaderMetadataWrapper(previousPosition, kafkaClusterId, DEFAULT_TERM_ID);

// Snapshot the RT DIV (single broker URL) in preparation to be produced
// VT DIV contains the latest consumed VT position (LCVP)
Expand All @@ -4375,20 +4479,29 @@ void sendGlobalRtDivMessage(
Map<CharSequence, ProducerPartitionState> rtDivPartitionStates = rtDiv.getPartitionStates(realTimeTopicType);

// Create GlobalRtDivState (RT DIV + LCRP) and serialize into a byte array. Try compression.
final byte[] valueBytes = createGlobalRtDivValueBytes(previousMessage, brokerUrl, rtDivPartitionStates);
final byte[] valueBytes =
createGlobalRtDivValueBytes(previousPosition, topicPartition, brokerUrl, rtDivPartitionStates);

// The callback onCompletionFunction sends the VT DIV + LCVP to the drainer after producing to VT successfully
final LeaderProducerCallback divCallback = createGlobalRtDivCallback(
previousMessage,
previousPosition,
pcs,
partition,
brokerUrl,
beforeProcessingRecordTimestampNs,
kafkaClusterId,
keyBytes,
valueBytes,
topicPartition);

// Install the produce-completion callback that sends the VT DIV + LCVP to the drainer after producing to VT
// successfully. Must be set before the produce below so the callback is registered before the produce can complete.
// The returned future completes once that drainer-side VT DIV sync has run; the graceful-shutdown leader path
// awaits it while steady-state callers ignore it.
CompletableFuture<Void> vtDivSyncedFuture = sendVtDivSnapshotOnCompletion(
divCallback,
topicPartition,
vtDiv);
vtDiv,
divCallback.getLeaderProducedRecordContext().getPersistedToDBFuture());

// Read the old manifest (if any) so VeniceWriter can delete orphaned old chunks in Kafka.
ChunkedValueManifestContainer valueManifestContainer = new ChunkedValueManifestContainer();
Expand All @@ -4407,7 +4520,7 @@ void sendGlobalRtDivMessage(
"event=globalRtDiv Sending Global RT DIV message for topic-partition: {} versionTopic: {} LCRP: {} broker: {} producerCount: {}, valueSize: {}",
topicPartition,
versionTopic,
previousMessage.getPosition(),
previousPosition,
brokerUrl,
rtDivPartitionStates.size(),
valueBytes.length);
Expand All @@ -4429,13 +4542,14 @@ void sendGlobalRtDivMessage(
true);

pcs.resetConsumedBytesSinceLastGlobalRtDivSync(brokerUrl);
return vtDivSyncedFuture;
}

private byte[] createGlobalRtDivValueBytes(
DefaultPubSubMessage previousMessage,
PubSubPosition previousPosition,
PubSubTopicPartition topicPartition,
String brokerUrl,
Map<CharSequence, ProducerPartitionState> rtDivPartitionStates) {
final PubSubPosition previousPosition = previousMessage.getPosition();
GlobalRtDivState globalRtDiv =
new GlobalRtDivState(brokerUrl, rtDivPartitionStates, previousPosition.toWireFormatBuffer());
byte[] valueBytes = ByteUtils.extractByteArray(globalRtDivStateSerializer.serialize(globalRtDiv));
Expand All @@ -4444,24 +4558,23 @@ private byte[] createGlobalRtDivValueBytes(
} catch (IOException e) {
LOGGER.error(
"Failed to compress GlobalRtDivState for replica: {}. Will proceed without {} compression.",
previousMessage.getTopicPartition(),
topicPartition,
compressionStrategy,
e);
}
return valueBytes;
}

private LeaderProducerCallback createGlobalRtDivCallback(
DefaultPubSubMessage prevMessage,
PubSubPosition previousPosition,
PartitionConsumptionState pcs,
int partition,
String brokerUrl,
long beforeProcessingRecordTimestampNs,
int kafkaClusterId,
byte[] keyBytes,
byte[] valueBytes,
PubSubTopicPartition topicPartition,
PartitionTracker vtDiv) {
PubSubTopicPartition topicPartition) {
final int schemaId = AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion();
KafkaKey divKey = new KafkaKey(MessageType.GLOBAL_RT_DIV, keyBytes);
KafkaMessageEnvelope divEnvelope = getVeniceWriter(pcs).get()
Expand All @@ -4481,18 +4594,12 @@ private LeaderProducerCallback createGlobalRtDivCallback(
divKey,
divEnvelope,
topicPartition,
prevMessage.getPosition(),
previousPosition,
System.currentTimeMillis(),
divKey.getKeyLength() + valueBytes.length);
LeaderProducedRecordContext context =
LeaderProducedRecordContext.newPutRecord(kafkaClusterId, prevMessage.getPosition(), keyBytes, put);
LeaderProducerCallback divCallback =
createProducerCallback(divMessage, pcs, context, partition, brokerUrl, beforeProcessingRecordTimestampNs);

// After producing the RT DIV to local VT, the drainer should sync the VT DIV + LCVP within the OffsetRecord
sendVtDivSnapshotOnCompletion(divCallback, topicPartition, vtDiv, context.getPersistedToDBFuture());

return divCallback;
LeaderProducedRecordContext.newPutRecord(kafkaClusterId, previousPosition, keyBytes, put);
return createProducerCallback(divMessage, pcs, context, partition, brokerUrl, beforeProcessingRecordTimestampNs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public LeaderProducerCallback(
this.beforeProcessingRecordTimestampNs = beforeProcessingRecordTimestampNs;
}

LeaderProducedRecordContext getLeaderProducedRecordContext() {
return leaderProducedRecordContext;
}

@Override
public void onCompletion(PubSubProduceResult produceResult, Exception e) {
this.onCompletionFunction.accept(produceResult);
Comment thread
KaiSernLim marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,19 @@ public CompletableFuture<Void> execSyncOffsetCommandAsync(
}

@Override
public void execSyncOffsetFromSnapshotAsync(
public CompletableFuture<Void> execSyncGlobalRtDivAsync(
PubSubTopicPartition topicPartition,
StoreIngestionTask ingestionTask) throws InterruptedException {
return getDelegate(ingestionTask).execSyncGlobalRtDivAsync(topicPartition, ingestionTask);
}

@Override
public CompletableFuture<Void> execSyncOffsetFromSnapshotAsync(
PubSubTopicPartition topicPartition,
PartitionTracker vtDivSnapshot,
CompletableFuture<Void> lastRecordPersistedFuture,
StoreIngestionTask ingestionTask) throws InterruptedException {
getDelegate(ingestionTask)
return getDelegate(ingestionTask)
.execSyncOffsetFromSnapshotAsync(topicPartition, vtDivSnapshot, lastRecordPersistedFuture, ingestionTask);
}

Expand Down
Loading
Loading