-
Notifications
You must be signed in to change notification settings - Fork 114
[controller] Remove V1 admin topic metadata and standardize on V2 format #2465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[controller] Remove V1 admin topic metadata and standardize on V2 format #2465
Conversation
- Remove all V1 methods, constants, and configuration keys - Fix AdminConsumptionTask to synchronize numeric offsets with positions - Add automatic offset sync in AdminMetadata.setPubSubPosition() - Update all tests to use V2 format (AdminMetadata vs Map<String, Long>) - Add generateAdminMetadata() helper for V2 test data creation - System now exclusively uses V2 admin topic metadata format Fixes test failures caused by V1/V2 data format mismatches.
- Add @VisibleForTesting setter for adminTopicMetadataAccessor in VeniceParentHelixAdmin to allow mocking in tests - Update AbstractTestVeniceParentHelixAdmin to inject a mock AdminTopicMetadataAccessor instead of relying on zkClient.readData() - Remove obsolete zkClient.readData() mock setups and verify calls from TestVeniceParentHelixAdmin and TestVeniceParentHelixAdminWithAcl - Tests now focus on behavior rather than ZK implementation details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Removes legacy V1 (numeric-offset-based) admin topic metadata handling from the Venice controller and standardizes persistence and reads on the V2 AdminMetadata (PubSubPosition-based) format, with corresponding unit test updates and ZK path/documentation adjustments.
Changes:
- Removed V1 admin topic metadata config/path support and updated controller/ZK accessor code to use only V2
AdminMetadata. - Updated controller admin-consumption persistence logic to keep numeric offsets aligned with PubSub positions during migration.
- Refactored/updated unit tests and admin-tool ZK path expectations to align with V2-only metadata.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java | Updates tests to use AdminMetadata instead of legacy metadata maps. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java | Updates ZK accessor tests to V2-only AdminMetadata reads/writes. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java | Removes legacy ZK metadata mocking tied to V1. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java | Removes legacy ZK metadata mocking tied to V1. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java | Switches to V2 ZK metadata path and injects a mock metadata accessor for tests. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java | Removes numeric-offset-based failure metric. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java | Removes legacy map conversion and simplifies position/offset handling. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java | Persists V2 metadata and adds logic intended to sync numeric offsets with positions. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java | Removes V1 ZK path/serialization and reads/writes only V2 AdminMetadata. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java | Makes metadata accessor settable for tests via @VisibleForTesting setter. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java | Removes the V2-toggle config field/method (now always V2). |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java | Adds a V2 helper method for generating AdminMetadata. |
| internal/venice-common/src/main/java/com/linkedin/venice/zk/VeniceZkPaths.java | Removes V1 admin topic metadata constant and updates cluster managed-path set. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Removes obsolete USE_V2_ADMIN_TOPIC_METADATA config key. |
| docs/venice-zk-helix-layout.md | Adds ZK/Helix layout documentation including admin topic metadata sections. |
| clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestZkCopier.java | Updates admin-tool tests toward V2 constant usage. |
| clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java | Updates admin-tool tree tests toward V2 constant usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private static final Set<String> CLUSTER_ZK_PATHS_MODIFIABLE = new HashSet<>( | ||
| Arrays.asList(ADMIN_TOPIC_METADATA, EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES)); | ||
| private static final Set<String> CLUSTER_ZK_PATHS_MODIFIABLE = | ||
| new HashSet<>(Arrays.asList(EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLUSTER_ZK_PATHS no longer includes the admin topic metadata path, but the controller now standardizes on adminTopicMetadataV2. As a result, ZkCopier.buildRequiredPathsTree() and other tooling that relies on CLUSTER_ZK_PATHS will omit admin topic metadata entirely. Add ADMIN_TOPIC_METADATA_V2 to CLUSTER_ZK_PATHS_MODIFIABLE (and update any related migration/validation logic accordingly).
| new HashSet<>(Arrays.asList(EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES)); | |
| new HashSet<>(Arrays.asList(ADMIN_TOPIC_METADATA_V2, EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES)); |
| private void testContainsChildAsserts(TreeNode child) { | ||
| Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA)); | ||
| Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA_V2)); | ||
| Assert.assertTrue(child.containsChild(EXECUTION_IDS)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test mixes the V2 znode constant (ADMIN_TOPIC_METADATA_V2) with hard-coded V1 path strings (.../adminTopicMetadata...) in both the input path list and assertions. With V1 removed, these should be updated to .../adminTopicMetadataV2... (and the expected extracted paths/resources updated accordingly), otherwise the test is validating the wrong ZK layout.
| @@ -89,7 +89,7 @@ private TreeNode buildTestTree() { | |||
| TreeNode root = new TreeNode(BASE_PATH); | |||
| root.addChild(STORE_CONFIGS); | |||
| TreeNode cluster1 = root.addChild(CLUSTER_1); | |||
| TreeNode adminTopicMetadata = cluster1.addChild(ADMIN_TOPIC_METADATA); | |||
| TreeNode adminTopicMetadata = cluster1.addChild(ADMIN_TOPIC_METADATA_V2); | |||
| adminTopicMetadata.addChild("file1"); | |||
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildTestTree() now creates the admin topic metadata node using ADMIN_TOPIC_METADATA_V2, but testPathsTreeToList() and getPaths() still assert/build paths containing .../adminTopicMetadata... (V1). Update those strings to .../adminTopicMetadataV2... so the test matches the new tree structure.
| // Set numeric offsets to keep them in sync with positions | ||
| if (localPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) { | ||
| adminMetadata.setOffset(((ApacheKafkaOffsetPosition) localPositionCheckpointAtStartTime).getNumericOffset()); | ||
| } | ||
| if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) { | ||
| adminMetadata.setUpstreamOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset()); | ||
| } | ||
| } else { | ||
| adminMetadata.setPubSubPosition(lastDelegatedPosition); | ||
| adminMetadata.setUpstreamPubSubPosition(upstreamPositionCheckpointAtStartTime); | ||
| // Set numeric offsets to keep them in sync with positions | ||
| if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) { | ||
| adminMetadata.setOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset()); | ||
| } | ||
| if (upstreamPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) { | ||
| adminMetadata.setUpstreamOffset( | ||
| ((ApacheKafkaOffsetPosition) upstreamPositionCheckpointAtStartTime).getNumericOffset()); | ||
| } |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new numeric offset syncing is redundant: AdminMetadata#setPubSubPosition() and #setUpstreamPubSubPosition() already set offset/upstreamOffset from the provided PubSubPosition. Keeping both paths increases the chance of divergence (and makes the intent harder to follow). Prefer relying on the setters’ existing synchronization logic (or centralize the sync in one place).
| // Set numeric offsets to keep them in sync with positions | |
| if (localPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) { | |
| adminMetadata.setOffset(((ApacheKafkaOffsetPosition) localPositionCheckpointAtStartTime).getNumericOffset()); | |
| } | |
| if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) { | |
| adminMetadata.setUpstreamOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset()); | |
| } | |
| } else { | |
| adminMetadata.setPubSubPosition(lastDelegatedPosition); | |
| adminMetadata.setUpstreamPubSubPosition(upstreamPositionCheckpointAtStartTime); | |
| // Set numeric offsets to keep them in sync with positions | |
| if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) { | |
| adminMetadata.setOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset()); | |
| } | |
| if (upstreamPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) { | |
| adminMetadata.setUpstreamOffset( | |
| ((ApacheKafkaOffsetPosition) upstreamPositionCheckpointAtStartTime).getNumericOffset()); | |
| } | |
| } else { | |
| adminMetadata.setPubSubPosition(lastDelegatedPosition); | |
| adminMetadata.setUpstreamPubSubPosition(upstreamPositionCheckpointAtStartTime); |
| AdminMetadata metaData = adminTopicMetadataAccessor.getMetadata(clusterName); | ||
| return AdminTopicMetadataAccessor.getExecutionId(metaData) == executionId; |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests no longer assert anything about the numeric offset fields, but persistAdminTopicMetadata() now explicitly syncs offsets for backward compatibility. Add an assertion in the wait condition (or an additional assertion) that the persisted AdminMetadata has offset/upstreamOffset consistent with the persisted position/upstreamPosition so the migration behavior is covered by unit tests.
|
|
||
| ``` | ||
| /venice/<ClusterName>/ | ||
| ├── adminTopicMetadata # Admin topic consumption offset (V1 format) |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new doc still describes /adminTopicMetadata (V1) as part of the active cluster layout and includes a full V1 section. Since this PR removes V1 support and even removes the ADMIN_TOPIC_METADATA constant, the doc should be updated to either (a) remove V1 entirely, or (b) clearly mark it as a historical/removed path that is no longer read/written by current code and should not be used.
| ├── adminTopicMetadata # Admin topic consumption offset (V1 format) | |
| ├── adminTopicMetadata # LEGACY (V1) admin topic metadata path; no longer read/written by current code and should not be used |
| private final VeniceHelixAdmin veniceHelixAdmin; | ||
| private final Map<String, VeniceWriter<byte[], byte[], byte[]>> veniceWriterMap; | ||
| private final AdminTopicMetadataAccessor adminTopicMetadataAccessor; | ||
| private AdminTopicMetadataAccessor adminTopicMetadataAccessor; |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adminTopicMetadataAccessor was changed from final to mutable with a test-only setter. This weakens safe publication/immutability guarantees and makes accidental reassignment in production possible. Prefer keeping the field final and injecting a test accessor via a @VisibleForTesting constructor/Factory, or at least make the field volatile and guard the setter (e.g., only allow setting once, non-null).
| private AdminTopicMetadataAccessor adminTopicMetadataAccessor; | |
| private volatile AdminTopicMetadataAccessor adminTopicMetadataAccessor; |
| /** | ||
| * Generate AdminMetadata object with specified values (V2 format) | ||
| */ | ||
| public static AdminMetadata generateAdminMetadata( | ||
| Optional<Long> localOffset, | ||
| Optional<Long> upstreamOffset, | ||
| Optional<Long> executionId, | ||
| Optional<Long> adminOperationProtocolVersion) { | ||
| AdminMetadata metadata = new AdminMetadata(); | ||
| executionId.ifPresent(metadata::setExecutionId); | ||
| localOffset.ifPresent(metadata::setOffset); | ||
| upstreamOffset.ifPresent(metadata::setUpstreamOffset); | ||
| adminOperationProtocolVersion.ifPresent(metadata::setAdminOperationProtocolVersion); | ||
| return metadata; | ||
| } |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generateAdminMetadata() currently only sets numeric offsets (and execution/protocol), but not the corresponding position/upstreamPosition fields. That can produce partially-populated V2 metadata objects where positions fall back to EARLIEST even when offsets are set (depending on how callers use raw fields vs getters). Also, this helper (and the existing generateMetadataMap) appears unused in the repo; consider either removing unused helpers or updating generateAdminMetadata() to set positions via setPubSubPosition/setUpstreamPubSubPosition so offsets and positions stay consistent by construction.
Add unit test to verify numeric offset extraction from ApacheKafkaOffsetPosition, covering the new code paths added for synchronizing numeric offsets with PubSubPositions.
- Extract syncNumericOffsetFromPosition() as a static utility method that can be directly unit tested - Add comprehensive tests covering Kafka positions, non-Kafka positions, and null positions - This provides proper branch coverage for the instanceof checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private void testContainsChildAsserts(TreeNode child) { | ||
| Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA)); | ||
| Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA_V2)); | ||
| Assert.assertTrue(child.containsChild(EXECUTION_IDS)); | ||
| Assert.assertTrue(child.containsChild(PARENT_OFFLINE_PUSHES)); | ||
| Assert.assertTrue(child.containsChild(ROUTERS)); |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test now asserts the required cluster children include ADMIN_TOPIC_METADATA_V2, but the rest of the test data/expectations still hard-code the V1 path name adminTopicMetadata (e.g., in testVenicePathsContainsAsserts() and getPaths()). Once CLUSTER_ZK_PATHS includes the V2 node, these assertions will fail and/or stop covering the right path. Update the hard-coded path strings and expected counts to use adminTopicMetadataV2 consistently.
| @@ -89,7 +89,7 @@ private TreeNode buildTestTree() { | |||
| TreeNode root = new TreeNode(BASE_PATH); | |||
| root.addChild(STORE_CONFIGS); | |||
| TreeNode cluster1 = root.addChild(CLUSTER_1); | |||
| TreeNode adminTopicMetadata = cluster1.addChild(ADMIN_TOPIC_METADATA); | |||
| TreeNode adminTopicMetadata = cluster1.addChild(ADMIN_TOPIC_METADATA_V2); | |||
| adminTopicMetadata.addChild("file1"); | |||
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildTestTree() switched to ADMIN_TOPIC_METADATA_V2, but testPathsTreeToList() and getPaths() still assert/build paths under the V1 name adminTopicMetadata. This makes the test internally inconsistent and it will break once the required/canonical cluster path is fully standardized on V2. Update the remaining hard-coded strings to adminTopicMetadataV2 (and adjust expected list size if needed).
| adminConsumeFailCountSensor = registerSensor("failed_admin_messages", new Count()); | ||
| adminConsumeFailRetriableMessageCountSensor = registerSensor("failed_retriable_admin_messages", new Count()); | ||
| adminTopicDIVErrorReportCountSensor = registerSensor("admin_message_div_error_report_count", new Count()); | ||
| registerSensor( | ||
| new AsyncGauge( | ||
| (ignored, ignored2) -> adminConsumptionFailedPosition == null | ||
| ? 0L | ||
| : adminConsumptionFailedPosition.getNumericOffset(), | ||
| "failed_admin_message_offset")); | ||
| adminConsumptionCycleDurationMsSensor = | ||
| registerSensor("admin_consumption_cycle_duration_ms", new Avg(), new Min(), new Max()); | ||
| registerSensor( |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failed_admin_message_offset gauge was removed, but adminConsumptionFailedPosition (and its setter) now appear to be unused for metrics publication. If this metric is truly being retired, consider also removing adminConsumptionFailedPosition and the setAdminConsumptionFailedPosition(...) plumbing to avoid carrying dead state through AdminConsumptionTask.
| /** | ||
| * Generate AdminMetadata object with specified values (V2 format) | ||
| */ | ||
| public static AdminMetadata generateAdminMetadata( | ||
| Optional<Long> localOffset, | ||
| Optional<Long> upstreamOffset, | ||
| Optional<Long> executionId, | ||
| Optional<Long> adminOperationProtocolVersion) { | ||
| AdminMetadata metadata = new AdminMetadata(); | ||
| executionId.ifPresent(metadata::setExecutionId); | ||
| localOffset.ifPresent(metadata::setOffset); | ||
| upstreamOffset.ifPresent(metadata::setUpstreamOffset); | ||
| adminOperationProtocolVersion.ifPresent(metadata::setAdminOperationProtocolVersion); | ||
| return metadata; | ||
| } |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generateAdminMetadata(...) is newly added but currently unused in the codebase, and the existing legacy generateMetadataMap(...) also appears unused. Since the PR goal is to standardize on V2, consider either (a) updating call sites/tests to use generateAdminMetadata(...) and deleting the legacy map helper, or (b) removing both helpers if they’re no longer needed to avoid accumulating dead APIs.
| "offset": 500, (being deprecated so avoid using id) | ||
| "upstreamOffset": 88998324343, (being deprecated so avoid using id) |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in the V2 example JSON: the note says "avoid using id" but should be "avoid using it" (referring to the deprecated numeric offsets).
| "offset": 500, (being deprecated so avoid using id) | |
| "upstreamOffset": 88998324343, (being deprecated so avoid using id) | |
| "offset": 500, (being deprecated so avoid using it) | |
| "upstreamOffset": 88998324343, (being deprecated so avoid using it) |
- Update test resource files to use adminTopicMetadataV2 - Add ADMIN_TOPIC_METADATA_V2 to CLUSTER_ZK_PATHS set - Update hardcoded path assertions in tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete this file as it's not required
mynameborat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes looks fine.
One question about renaming occurrences of AdminTopicMetadataV2 to just AdminTopicMetadata as some of the accessors and stuff point to just metadata.
Do you think its okay to rename everything to AdminTopicMetadata give we have cleaned up and fully ramped?
| @@ -1,20 +1,15 @@ | |||
| package com.linkedin.venice.controller; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about cleaning up the data in zookeeper?
|
|
||
| @Test | ||
| public void testAddStore() { | ||
| when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clean up
Problem Statement
The Venice controller currently supports both V1 (numeric offsets) and V2 (PubSubPosition-based) admin topic metadata formats. This dual support adds complexity and is no longer needed since V2 has been stable.
Solution
Remove all V1 admin topic metadata code and standardize exclusively on V2 format.
Code changes
USE_V2_ADMIN_TOPIC_METADATAconfig key (no longer needed)ADMIN_TOPIC_METADATAconstant for V1 pathisUseV2AdminTopicMetadata()methodAdminMetadatafailed_admin_message_offsetmetric that exposed numeric offsetgenerateAdminMetadata()helper for V2 format@VisibleForTestingsetter for adminTopicMetadataAccessorTest changes
AdminTopicMetadataAccessorinstead of mocking zkClient.readData()Code changes
Concurrency-Specific Checks
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
All 160+ controller unit tests pass.
Does this PR introduce any user-facing or breaking changes?
This is a cleanup PR that removes deprecated V1 admin topic metadata support. Systems should already be using V2 format.
Based on PR #2297 by @haoxu07, rebased and fixed test failures