[server][dvc] Config-gate collection-merge element replacement for order:ignore fields#2846
Open
sixpluszero wants to merge 1 commit into
Open
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses a correctness gap in Active/Active write-compute collection merge for SET_UNION on array-of-record fields when record equality/comparison ignores order: ignore fields, causing “content-only” updates to be silently dropped. It introduces an opt-in server config that allows the incoming element to replace the stored element (and adds a deterministic tie-break for equal timestamps) so ignored-field content can still propagate and converge across colos.
Changes:
- Add config-gated element replacement + deterministic equal-timestamp tie-break in
SortBasedCollectionFieldOpHandlerfor A/A collection merge. - Thread a new server config flag (
server.aa.collection.field.element.replacement.enabled, defaultfalse) through server config → ingestion task → merge conflict resolver → write-compute processor/handler. - Add unit + integration coverage for the
order: ignorefield scenario.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-test-common/src/integrationTest/resources/PartialUpdateRecordListWithIgnoredField.avsc | Adds an integration-test value schema with an array-of-record field containing an order: ignore field. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPartialUpdateWithActiveActiveReplication.java | Enables the new server flag for the suite and adds an end-to-end A/A replication test validating element replacement. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Introduces the new server config key constant and documents its behavior and default. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldElementReplacementTest.java | Adds focused unit tests for legacy vs enabled behavior and the equal-timestamp tie-break. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/writecompute/WriteComputeProcessor.java | Adds a constructor overload to pass the new flag down to the handler. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/writecompute/WriteComputeHandlerV2.java | Wires the flag into SortBasedCollectionFieldOpHandler construction. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java | Implements config-gated remove-then-put replacement on newer timestamps and full-content tie-break on equal timestamps. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/AvroCollectionElementComparator.java | Adds a “full comparison” comparator instance that includes order: ignore fields for deterministic tie-breaks. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolverFactory.java | Adds a backward-compatible overload to pass the new flag into WriteComputeProcessor. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java | Plumbs the server-config flag into merge conflict resolver creation in the A/A ingestion path. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java | Reads the new config key (default false) and exposes a getter. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
order:ignore fields During Active/Active write-compute collection merge (SET_UNION), an incoming array element that compares equal to an existing element only advanced the element's RMD timestamp and kept the stored object. When the array-element record has a field marked order:ignore, the incoming element's content in that field was silently dropped, because Avro comparison (and GenericRecord equals/hashCode) excludes order:ignore fields from element identity. Add element replacement in SortBasedCollectionFieldOpHandler, gated by the new server config server.aa.collection.field.element.replacement.enabled (default false for controlled rollout): - newer timestamp: replace the stored element with the incoming one - equal timestamp: deterministic tie-break by full element content (including order:ignore fields) so Active/Active colos converge Thread the flag from VeniceServerConfig through MergeConflictResolverFactory -> WriteComputeProcessor -> WriteComputeHandlerV2 -> SortBasedCollectionFieldOpHandler. Add AvroCollectionElementComparator.FULL_COMPARISON_INSTANCE (includes order:ignore fields) for the equal-timestamp tie-break. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1bbf9f0 to
156f8ce
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
During Active/Active (A/A) write-compute collection merge, a
SET_UNION(add-to-set) on an array field that re-adds an element which is comparison-equal to an already-stored element only advances that element's per-element replication-metadata (RMD) timestamp — it keeps the stored object. Element identity isGenericRecordequals/hashCode, which (like Avro'sGenericDatacomparison) excludes fields markedorder: ignore.As a result, when the array-element record has an
order: ignorefield, an incoming element with the same identity but different content in the ignored field is treated as "already present", and the incoming content is silently dropped — the write appears applied (RMD timestamp moves, status isPARTIALLY_UPDATED) but the visible value never changes.The actual entry point for this path is
WriteComputeHandlerV2.handleModifyList→SortBasedCollectionFieldOpHandler.handleModifyCollectionMergeList, where the per-element map (IndexedHashMap) keeps the existing key on a collidingputand only overwrites the timestamp value.Solution
Add element replacement in
SortBasedCollectionFieldOpHandler, gated by a new server config (defaultfalse):existingTs < modifyTs): remove-then-put so the incoming element replaces the stored one, propagatingorder:ignore-field content.existingTs == modifyTs): deterministically break the tie by a full element comparison that includesorder:ignorefields (AvroCollectionElementComparator.FULL_COMPARISON_INSTANCE), so all A/A colos converge on the same element. The default comparator excludes ignored fields and would return 0 for exactly this case, so a dedicated comparison instance is required.When the flag is
false, behavior is byte-for-byte the legacy behavior. The flag is threadedVeniceServerConfig→ActiveActiveStoreIngestionTask→MergeConflictResolverFactory→WriteComputeProcessor→WriteComputeHandlerV2→SortBasedCollectionFieldOpHandlervia backward-compatible constructor/factory overloads (no existing caller changed). Scope is the A/A merge path only; the non-A/A leader write-compute path is unchanged (single-writer, no conflict).Code changes
server.aa.collection.field.element.replacement.enabled, defaultfalse.Concurrency-Specific Checks
final booleanset at construction; the handler and comparators are already@ThreadSafeand stateless per request.How was this PR tested?
SortBasedCollectionFieldElementReplacementTest— array-of-record schema with anorder:ignorefield; covers flag-off (legacy: content dropped, timestamp advanced), flag-on newer-timestamp (replaced), and flag-on equal-timestamp tie-break in both directions.TestPartialUpdateWithActiveActiveReplication#testAAReplicationForCollectionElementReplacementWithIgnoredField— A/A + write-compute store, twoSET_UNIONupdates that differ only in theorder:ignorefield; verifies both regions converge to the newer content with the config enabled. Passed (35.9s).schema.merge,schema.writecompute, andreplication.mergesuites pass unchanged (default flag-off path).Does this PR introduce any user-facing or breaking changes?
order: ignore.