Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import com.linkedin.venice.pubsub.api.exceptions.PubSubTopicDoesNotExistException;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
Expand Down Expand Up @@ -1552,12 +1551,10 @@ protected boolean handleVersionSwapControlMessage(
: Collections.emptyList();
}

if (RmdUtils.hasOffsetAdvanced(localOffset, highWatermarkOffsets)) {
currentVersionHighWatermarks
.putIfAbsent(pubSubTopicPartition.getPartitionNumber(), new ConcurrentHashMap<>());
currentVersionHighWatermarks.get(pubSubTopicPartition.getPartitionNumber())
.put(upstreamPartition, highWatermarkOffsets);
}
currentVersionHighWatermarks
.putIfAbsent(pubSubTopicPartition.getPartitionNumber(), new ConcurrentHashMap<>());
currentVersionHighWatermarks.get(pubSubTopicPartition.getPartitionNumber())
.put(upstreamPartition, highWatermarkOffsets);
Comment on lines +1554 to +1557
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regression-prevention logic in this method is now inconsistent: localOffset is still computed (and the surrounding comments mention preventing high-watermark regressions), but the hasOffsetAdvanced guard was removed and the map is overwritten unconditionally. Either remove the stale localOffset/comment block, or restore a monotonicity check (e.g., via DiffValidationUtils.hasOffsetAdvanced) before updating currentVersionHighWatermarks.

Copilot uses AI. Check for mistakes.
switchToNewTopic(newServingVersionTopic, topicSuffix, pubSubTopicPartition.getPartitionNumber());
chunkAssembler.clearBuffer();

Expand Down Expand Up @@ -1642,13 +1639,6 @@ private boolean filterRecordByVersionSwapHighWatermarks(
List<Long> recordCheckpointVector,
PubSubTopicPartition pubSubTopicPartition,
Integer upstreamPartition) {
int partitionId = pubSubTopicPartition.getPartitionNumber();
List<Long> localOffset = (List<Long>) currentVersionHighWatermarks.getOrDefault(partitionId, Collections.EMPTY_MAP)
.getOrDefault(upstreamPartition, new ArrayList<>());
if (recordCheckpointVector != null) {
return !RmdUtils.hasOffsetAdvanced(localOffset, recordCheckpointVector);
}
// Has not met version swap message after client initialization.
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,6 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
beforeProcessingBatchRecordsTimestampMs);

final long writeTimestamp = getWriteTimestampFromKME(kafkaValue);
final long offsetSumPreOperation =
rmdWithValueSchemaID != null ? RmdUtils.extractOffsetVectorSumFromRmd(rmdWithValueSchemaID.getRmdRecord()) : 0;
List<Long> recordTimestampsPreOperation = rmdWithValueSchemaID != null
? RmdUtils.extractTimestampFromRmd(rmdWithValueSchemaID.getRmdRecord())
: Collections.singletonList(0L);
Comment on lines 498 to 500
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordTimestampsPreOperation is now only computed to be passed into validatePostOperationResultsAndRecord, but the validation method no longer uses it. Consider removing this extraction (and possibly the validation call) to avoid unnecessary per-record work.

Suggested change
List<Long> recordTimestampsPreOperation = rmdWithValueSchemaID != null
? RmdUtils.extractTimestampFromRmd(rmdWithValueSchemaID.getRmdRecord())
: Collections.singletonList(0L);

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -578,7 +576,7 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
if (rmdWithValueSchemaID != null) {
aggVersionedIngestionStats.recordTotalDuplicateKeyUpdate(storeName, versionNumber);
}
validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation);
validatePostOperationResultsAndRecord(mergeConflictResult, recordTimestampsPreOperation);

final ByteBuffer updatedValueBytes = maybeCompressData(
consumerRecord.getTopicPartition().getPartitionNumber(),
Expand Down Expand Up @@ -741,23 +739,11 @@ private long getWriteTimestampFromKME(KafkaMessageEnvelope kme) {

private void validatePostOperationResultsAndRecord(
MergeConflictResult mergeConflictResult,
Long offsetSumPreOperation,
List<Long> timestampsPreOperation) {
// Nothing was applied, no harm no foul
if (mergeConflictResult.isUpdateIgnored()) {
return;
}
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validatePostOperationResultsAndRecord has become a no-op: if the update is not ignored, the method still performs no validation or metrics/logging. If the offset/timestamp checks are intentionally removed, consider deleting this method and its call site to avoid dead code; otherwise, keep the remaining validation here.

Suggested change
}
}
// Basic sanity checks and diagnostics for applied updates.
if (timestampsPreOperation == null) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("timestampsPreOperation is null while processing a non-ignored merge result: {}", mergeConflictResult);
}
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Validated post-operation merge result. Update applied: {}, pre-operation timestamps count: {}",
!mergeConflictResult.isUpdateIgnored(),
timestampsPreOperation.size());
}

Copilot uses AI. Check for mistakes.
// Post Validation checks on resolution
GenericRecord rmdRecord = mergeConflictResult.getRmdRecord();
if (offsetSumPreOperation > RmdUtils.extractOffsetVectorSumFromRmd(rmdRecord)) {
// offsets went backwards, raise an alert!
hostLevelIngestionStats.recordOffsetRegressionDCRError();
aggVersionedIngestionStats.recordOffsetRegressionDCRError(storeName, versionNumber);
LOGGER.error(
"Offset vector found to have gone backwards for {}!! New invalid replication metadata result: {}",
storeVersionName,
rmdRecord);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.linkedin.davinci.schema.merge.ValueAndRmd;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import java.util.List;
import java.util.ArrayList;
import org.apache.avro.generic.GenericRecord;


Expand All @@ -28,7 +28,7 @@ protected ValueAndRmd<T> putWithRecordLevelTimestamp(
// New value wins
oldValueAndRmd.setValue(newValue);
oldRmd.put(TIMESTAMP_FIELD_POS, putOperationTimestamp);
updateReplicationCheckpointVector(oldRmd, newValueSourcePosition, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldRmd);

} else if (oldTimestamp == putOperationTimestamp) {
// When timestamps tie, compare decide which one should win.
Expand All @@ -38,7 +38,7 @@ protected ValueAndRmd<T> putWithRecordLevelTimestamp(
oldValueAndRmd.setUpdateIgnored(true);
} else {
oldValueAndRmd.setValue(newValue);
updateReplicationCheckpointVector(oldRmd, newValueSourcePosition, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldRmd);
}

} else {
Expand All @@ -60,24 +60,16 @@ protected ValueAndRmd<T> deleteWithValueLevelTimestamp(
// Still need to track the delete timestamp in order to reject future PUT record with lower replication timestamp
final GenericRecord oldRmd = oldValueAndRmd.getRmd();
oldRmd.put(TIMESTAMP_FIELD_POS, deleteOperationTimestamp);
updateReplicationCheckpointVector(oldRmd, newValueSourcePosition, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldRmd);

} else {
oldValueAndRmd.setUpdateIgnored(true);
}
return oldValueAndRmd;
}

protected void updateReplicationCheckpointVector(
GenericRecord oldRmd,
PubSubPosition newValueSourcePosition,
int newValueSourceBrokerID) {
oldRmd.put(
REPLICATION_CHECKPOINT_VECTOR_FIELD_POS,
MergeUtils.mergeOffsetVectors(
(List<Long>) oldRmd.get(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS),
newValueSourcePosition.getNumericOffset(),
newValueSourceBrokerID));
protected void updateReplicationCheckpointVector(GenericRecord oldRmd) {
oldRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateReplicationCheckpointVector currently overwrites any existing replication_checkpoint_vector with a new empty ArrayList on every applied operation. This will actively wipe non-empty vectors written by older servers and also allocates a new list per merge. If the goal is to stop populating the vector, consider leaving the existing field untouched (or only initializing it to empty when it is null/missing) and prefer Collections.emptyList() if you just need an empty value.

Suggested change
oldRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());
Object existingVector = oldRmd.get(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS);
if (existingVector == null) {
oldRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, java.util.Collections.emptyList());
}

Copilot uses AI. Check for mistakes.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,7 @@ private MergeConflictResult putWithoutRmd(
*/
GenericRecord newRmd = newRmdCreator.apply(newValueSchemaID);
newRmd.put(TIMESTAMP_FIELD_POS, putOperationTimestamp);
// A record which didn't come from an RT topic or has null metadata should have no offset vector.
newRmd.put(
REPLICATION_CHECKPOINT_VECTOR_FIELD_POS,
MergeUtils.mergeOffsetVectors(null, newValueSourcePosition.getNumericOffset(), newValueSourceBrokerID));
newRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());

Comment on lines 576 to 579
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes newly created RMDs always store an empty replication_checkpoint_vector. There are existing tests that assert this vector is populated (e.g., internal/venice-test-common/src/integrationTest/.../TestSeparateRealtimeTopicIngestion checks offset vector size/values). Please update those expectations or adjust the change to avoid breaking test coverage that still validates checkpoint vectors.

Copilot uses AI. Check for mistakes.
if (useFieldLevelTimestamp) {
Schema valueSchema = getValueSchema(newValueSchemaID);
Expand All @@ -603,9 +600,7 @@ private MergeConflictResult deleteWithoutRmd(
final int valueSchemaID = storeSchemaCache.getSupersetOrLatestValueSchema().getId();
GenericRecord newRmd = newRmdCreator.apply(valueSchemaID);
newRmd.put(TIMESTAMP_FIELD_POS, deleteOperationTimestamp);
newRmd.put(
REPLICATION_CHECKPOINT_VECTOR_FIELD_POS,
MergeUtils.mergeOffsetVectors(null, newValueSourcePosition.getNumericOffset(), deleteOperationSourceBrokerID));
newRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());
if (useFieldLevelTimestamp) {
Schema valueSchema = getValueSchema(valueSchemaID);
newRmd = createOldValueAndRmd(valueSchema, valueSchemaID, valueSchemaID, Lazy.of(() -> null), newRmd).getRmd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private ValueAndRmd<GenericRecord> handlePutWithPerFieldLevelTimestamp(
GenericRecord newValue) {
final GenericRecord oldReplicationMetadata = oldValueAndRmd.getRmd();
final GenericRecord oldValue = oldValueAndRmd.getValue();
updateReplicationCheckpointVector(oldReplicationMetadata, sourcePositionOfNewValue, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldReplicationMetadata);

Comment on lines 116 to 120
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateReplicationCheckpointVector(oldReplicationMetadata) is called before any per-field merge happens, so it mutates the RMD even if all field updates are later ignored (noFieldUpdated). Consider only updating the checkpoint vector when at least one field update is actually applied.

Copilot uses AI. Check for mistakes.
List<Schema.Field> fieldsInNewRecord = newValue.getSchema().getFields();
boolean noFieldUpdated = true;
Expand Down Expand Up @@ -166,7 +166,7 @@ public ValueAndRmd<GenericRecord> delete(
oldValueAndRmd);

case PER_FIELD_TIMESTAMP:
updateReplicationCheckpointVector(oldReplicationMetadata, newValueSourcePosition, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldReplicationMetadata);
UpdateResultStatus recordDeleteResultStatus = mergeRecordHelper.deleteRecord(
oldValueAndRmd.getValue(),
(GenericRecord) tsObject,
Expand Down Expand Up @@ -195,7 +195,7 @@ public ValueAndRmd<GenericRecord> update(
int updateOperationColoID,
PubSubPosition newValueSourcePosition,
int newValueSourceBrokerID) {
updateReplicationCheckpointVector(oldValueAndRmd.getRmd(), newValueSourcePosition, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldValueAndRmd.getRmd());
return writeComputeProcessor.updateRecordWithRmd(
currValueSchema,
oldValueAndRmd,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package com.linkedin.davinci.replication.merge;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;


public class MergeUtils {
private MergeUtils() {
// Utility class
Expand All @@ -31,24 +26,4 @@ public static Object compareAndReturn(Object o1, Object o2) {
}
}

/**
* @return If the input {@param oldOffsetVector} is null, the returned value could be null.
*/
static @Nullable List<Long> mergeOffsetVectors(List<Long> oldOffsetVector, Long newOffset, int sourceBrokerID) {
if (sourceBrokerID < 0) {
// Can happen if we could not deduce the sourceBrokerID (can happen due to a misconfiguration)
// in such cases, we will not try to alter the existing offsetVector, instead just returning it.
return oldOffsetVector;
}
final List<Long> offsetVector = oldOffsetVector == null ? new ArrayList<>(sourceBrokerID) : oldOffsetVector;

// Making sure there is room available for the insertion (fastserde LongList can't be cast to arraylist)
// Lists in java require that gaps be filled, so first we fill any gaps by adding some initial offset values
int i = offsetVector.size();
for (; i <= sourceBrokerID; i++) {
offsetVector.add(i, 0L);
}
offsetVector.set(sourceBrokerID, newOffset);
return offsetVector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@
import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_NAME;

import com.linkedin.avro.fastserde.coldstart.ColdPrimitiveLongList;
import com.linkedin.avro.fastserde.primitive.PrimitiveLongArrayList;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -280,41 +275,6 @@ public void testPermutation() {
Assert.assertEquals(GenericData.get().compare(result1, result2, userSchemaV1), 0);
}

/**
* Data provider which provides for many list implementations because we've been hurt before :'(
*/
@DataProvider(name = "Long-Lists-and-null")
public static Object[][] listImplementationsProvider() {
return new Object[][] { { new ArrayList<Long>() }, { new PrimitiveLongArrayList(0) },
{ new ColdPrimitiveLongList(0) }, { null } };
}

@Test(dataProvider = "Long-Lists-and-null")
public void testOffsetVectorMergeAndSum(List<Long> newVector) {
newVector = MergeUtils.mergeOffsetVectors(newVector, 1L, 0);
newVector = MergeUtils.mergeOffsetVectors(newVector, 2L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 3L, 4);
newVector = MergeUtils.mergeOffsetVectors(newVector, 7L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 8L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 9L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 3L, 5);
List<Long> expectedVector = Arrays.asList(1L, 9L, 0L, 0L, 3L, 3L);
Assert.assertEquals(newVector, expectedVector);
Assert.assertEquals(RmdUtils.sumOffsetVector(newVector), 16L);

newVector.clear();
newVector = MergeUtils.mergeOffsetVectors(newVector, 3L, 5);
newVector = MergeUtils.mergeOffsetVectors(newVector, 9L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 1L, 0);
newVector = MergeUtils.mergeOffsetVectors(newVector, 2L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 3L, 4);
newVector = MergeUtils.mergeOffsetVectors(newVector, 7L, 1);
newVector = MergeUtils.mergeOffsetVectors(newVector, 8L, 1);
expectedVector = Arrays.asList(1L, 8L, 0L, 0L, 3L, 3L);
Assert.assertEquals(newVector, expectedVector);
Assert.assertEquals(RmdUtils.sumOffsetVector(newVector), 15L);
}

private ByteBuffer serialize(GenericRecord record) {
return ByteBuffer.wrap(serializer.serialize(record));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.DELETED_ELEM_TS_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_NAME;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.antlr.v4.runtime.misc.NotNull;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

Expand All @@ -36,11 +34,6 @@ public static RmdTimestampType getRmdTimestampType(Object tsObject) {
}
}

public static long extractOffsetVectorSumFromRmd(GenericRecord replicationMetadataRecord) {
Object offsetVectorObject = replicationMetadataRecord.get(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS);
return sumOffsetVector(offsetVectorObject);
}

public static List<Long> extractOffsetVectorFromRmd(GenericRecord replicationMetadataRecord) {
Object offsetVector = replicationMetadataRecord.get(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS);
if (offsetVector == null) {
Expand Down Expand Up @@ -73,67 +66,6 @@ public static List<Long> extractTimestampFromRmd(GenericRecord replicationMetada
}
}

/**
* Returns a summation of all component parts to an offsetVector for vector comparison
* @param offsetVector offsetVector to be summed
* @return the sum of all offset vectors
*/
public static long sumOffsetVector(Object offsetVector) {
if (offsetVector == null) {
return 0L;
}
return ((List<Long>) offsetVector).stream().reduce(0L, Long::sum);
}

/**
* Checks to see if an offset vector has advanced completely beyond some base offset vector or not.
*
* @param baseOffset The vector to compare against.
* @param advancedOffset The vector has should be advanced along.
* @return True if the advancedOffset vector has grown beyond the baseOffset
*/
static public boolean hasOffsetAdvanced(@NotNull List<Long> baseOffset, @NotNull List<Long> advancedOffset) {
for (int i = 0; i < baseOffset.size(); i++) {
if (advancedOffset.size() - 1 < i) {
if (baseOffset.get(i) > 0) {
return false;
}
continue;
}
if (advancedOffset.get(i) < baseOffset.get(i)) {
return false;
} else if (advancedOffset.get(i) > baseOffset.get(i)) {
return true;
}
}
return true;
}

static public List<Long> mergeOffsetVectors(@NotNull List<Long> baseOffset, @NotNull List<Long> advancedOffset) {
List<Long> shortVector;
List<Long> longVector;

if (baseOffset.size() > advancedOffset.size()) {
shortVector = advancedOffset;
longVector = baseOffset;
} else {
shortVector = baseOffset;
longVector = advancedOffset;
}

List<Long> mergedVector = new ArrayList<>(longVector.size());

for (int i = 0; i < shortVector.size(); i++) {
mergedVector.add(Math.max(shortVector.get(i), longVector.get(i)));
}

if (longVector.size() != shortVector.size()) {
mergedVector.addAll(longVector.subList(shortVector.size(), longVector.size()));
}

return mergedVector;
}

static public long getLastUpdateTimestamp(Object object) {
// The replication metadata object contains a single field called "timestamp" which is a union of a long and a
// record.
Expand Down
Loading
Loading