Skip to content

Conversation

@sushantmane
Copy link
Contributor

Summary

  • Stop writing data into the replication_checkpoint_vector field in RMD records since PubSubPosition.getNumericOffset() is deprecated and slated for removal
  • Remove utility methods that are no longer needed: MergeUtils.mergeOffsetVectors, RmdUtils.extractOffsetVectorSumFromRmd, RmdUtils.sumOffsetVector, RmdUtils.hasOffsetAdvanced, RmdUtils.mergeOffsetVectors
  • Remove the offset regression check in ActiveActiveStoreIngestionTask.validatePostOperationResultsAndRecord
  • Simplify VeniceChangelogConsumerImpl by removing dead hasOffsetAdvanced guard

Backward/Forward Compatibility

  • New writer + old reader: New servers write empty vectors. Old servers read them. extractOffsetVectorSumFromRmd returns 0 for empty vectors. The regression check only compares pre vs post within a single server's merge cycle — no cross-server issue.
  • Old writer + new reader: Old servers still populate vectors. New servers don't read them (removed methods). No conflict.
  • RecordChangeEvent: Empty vectors are valid per the Avro schema default ([]). The only internal consumer (filterRecordByVersionSwapHighWatermarks) is documented as dead code.
  • Version swap watermarks: Independent data path; uses VersionSwap control message, not RMD checkpoint vector.

Test plan

  • TestMergeWithValueLevelTimestamp — all tests pass (removed testOffsetVectorMergeAndSum)
  • TestRmdUtils — all tests pass (removed testHasOffsetAdvanced, testExtractOffsetVectorSumFromRmd)
  • Spotless formatting applied
  • Compilation verified

offset-vector utility methods

Stop writing data into the replication_checkpoint_vector field in RMD records
since PubSubPosition.getNumericOffset() is deprecated and slated for removal.
The Avro schema for the field (array of long) stays unchanged — we simply stop
populating it and remove the utility methods that read it.

Changes:
- AbstractMerge/MergeGenericRecord: updateReplicationCheckpointVector now
writes
  an empty list instead of calling MergeUtils.mergeOffsetVectors
- MergeConflictResolver: putWithoutRmd/deleteWithoutRmd write empty list
- MergeUtils: delete mergeOffsetVectors method (no remaining callers)
- ActiveActiveStoreIngestionTask: remove offset regression check and
  offsetSumPreOperation variable
- RmdUtils: delete extractOffsetVectorSumFromRmd, sumOffsetVector,
  hasOffsetAdvanced, and mergeOffsetVectors methods
- VeniceChangelogConsumerImpl: remove hasOffsetAdvanced guard, always update
  currentVersionHighWatermarks; simplify dead
filterRecordByVersionSwapHighWatermarks
- Remove tests for deleted methods
Copilot AI review requested due to automatic review settings February 12, 2026 11:45
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR removes offset-vector population and related utilities from Da Vinci’s merge/ingestion paths in preparation for PubSubPosition.getNumericOffset() deprecation, and simplifies downstream code that depended on offset-vector comparisons.

Changes:

  • Stop writing to replication_checkpoint_vector in newly generated/updated RMD records and remove offset-vector merge/sum utilities.
  • Remove the offset regression validation from ActiveActiveStoreIngestionTask and related unit tests.
  • Simplify VeniceChangelogConsumerImpl by removing the hasOffsetAdvanced guard and making the version-swap record filter a no-op.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java Removes tests for deleted offset-vector helpers.
internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java Removes offset-vector sum/merge/advance helper methods.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeWithValueLevelTimestamp.java Removes tests covering offset-vector merge/sum behavior.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeUtils.java Removes the offset-vector merge utility, leaving only compare helper.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeGenericRecord.java Updates merge flow to stop updating checkpoint vectors using source offsets.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java Initializes checkpoint vector as empty for new RMD creation paths.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/AbstractMerge.java Changes checkpoint vector update to unconditional empty list assignment.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java Removes offset-regression validation; leaves now-dead validation scaffolding.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java Removes offset-advance guard for version swap HWMs; filter method is now no-op.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 116 to 120
GenericRecord newValue) {
final GenericRecord oldReplicationMetadata = oldValueAndRmd.getRmd();
final GenericRecord oldValue = oldValueAndRmd.getValue();
updateReplicationCheckpointVector(oldReplicationMetadata, sourcePositionOfNewValue, newValueSourceBrokerID);
updateReplicationCheckpointVector(oldReplicationMetadata);

Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateReplicationCheckpointVector(oldReplicationMetadata) is called before any per-field merge happens, so it mutates the RMD even if all field updates are later ignored (noFieldUpdated). Consider only updating the checkpoint vector when at least one field update is actually applied.

Copilot uses AI. Check for mistakes.
Comment on lines 576 to 579
GenericRecord newRmd = newRmdCreator.apply(newValueSchemaID);
newRmd.put(TIMESTAMP_FIELD_POS, putOperationTimestamp);
// A record which didn't come from an RT topic or has null metadata should have no offset vector.
newRmd.put(
REPLICATION_CHECKPOINT_VECTOR_FIELD_POS,
MergeUtils.mergeOffsetVectors(null, newValueSourcePosition.getNumericOffset(), newValueSourceBrokerID));
newRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());

Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes newly created RMDs always store an empty replication_checkpoint_vector. There are existing tests that assert this vector is populated (e.g., internal/venice-test-common/src/integrationTest/.../TestSeparateRealtimeTopicIngestion checks offset vector size/values). Please update those expectations or adjust the change to avoid breaking test coverage that still validates checkpoint vectors.

Copilot uses AI. Check for mistakes.
Comment on lines 498 to 500
List<Long> recordTimestampsPreOperation = rmdWithValueSchemaID != null
? RmdUtils.extractTimestampFromRmd(rmdWithValueSchemaID.getRmdRecord())
: Collections.singletonList(0L);
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordTimestampsPreOperation is now only computed to be passed into validatePostOperationResultsAndRecord, but the validation method no longer uses it. Consider removing this extraction (and possibly the validation call) to avoid unnecessary per-record work.

Suggested change
List<Long> recordTimestampsPreOperation = rmdWithValueSchemaID != null
? RmdUtils.extractTimestampFromRmd(rmdWithValueSchemaID.getRmdRecord())
: Collections.singletonList(0L);

Copilot uses AI. Check for mistakes.
Comment on lines +1554 to +1557
currentVersionHighWatermarks
.putIfAbsent(pubSubTopicPartition.getPartitionNumber(), new ConcurrentHashMap<>());
currentVersionHighWatermarks.get(pubSubTopicPartition.getPartitionNumber())
.put(upstreamPartition, highWatermarkOffsets);
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regression-prevention logic in this method is now inconsistent: localOffset is still computed (and the surrounding comments mention preventing high-watermark regressions), but the hasOffsetAdvanced guard was removed and the map is overwritten unconditionally. Either remove the stale localOffset/comment block, or restore a monotonicity check (e.g., via DiffValidationUtils.hasOffsetAdvanced) before updating currentVersionHighWatermarks.

Copilot uses AI. Check for mistakes.
// Nothing was applied, no harm no foul
if (mergeConflictResult.isUpdateIgnored()) {
return;
}
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validatePostOperationResultsAndRecord has become a no-op: if the update is not ignored, the method still performs no validation or metrics/logging. If the offset/timestamp checks are intentionally removed, consider deleting this method and its call site to avoid dead code; otherwise, keep the remaining validation here.

Suggested change
}
}
// Basic sanity checks and diagnostics for applied updates.
if (timestampsPreOperation == null) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("timestampsPreOperation is null while processing a non-ignored merge result: {}", mergeConflictResult);
}
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Validated post-operation merge result. Update applied: {}, pre-operation timestamps count: {}",
!mergeConflictResult.isUpdateIgnored(),
timestampsPreOperation.size());
}

Copilot uses AI. Check for mistakes.
newValueSourcePosition.getNumericOffset(),
newValueSourceBrokerID));
protected void updateReplicationCheckpointVector(GenericRecord oldRmd) {
oldRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateReplicationCheckpointVector currently overwrites any existing replication_checkpoint_vector with a new empty ArrayList on every applied operation. This will actively wipe non-empty vectors written by older servers and also allocates a new list per merge. If the goal is to stop populating the vector, consider leaving the existing field untouched (or only initializing it to empty when it is null/missing) and prefer Collections.emptyList() if you just need an empty value.

Suggested change
oldRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<>());
Object existingVector = oldRmd.get(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS);
if (existingVector == null) {
oldRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, java.util.Collections.emptyList());
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant