diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index aa3ce1857d2..e1bca6e6eb0 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -597,6 +597,9 @@ public static void main(String[] args) throws Exception { case CLUSTER_BATCH_TASK: clusterBatchTask(cmd); break; + case UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION: + updateAdminOperationProtocolVersion(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -3270,6 +3273,16 @@ private static void dumpHostHeartbeat(CommandLine cmd) throws Exception { } } + private static void updateAdminOperationProtocolVersion(CommandLine cmd) throws Exception { + String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); + String protocolVersionInString = + getRequiredArgument(cmd, Arg.ADMIN_OPERATION_PROTOCOL_VERSION, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); + long protocolVersion = + Utils.parseLongFromString(protocolVersionInString, Arg.ADMIN_OPERATION_PROTOCOL_VERSION.name()); + ControllerResponse response = controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion); + printObject(response); + } + private static void migrateVeniceZKPaths(CommandLine cmd) throws Exception { Set clusterNames = Utils.parseCommaSeparatedStringToSet(getRequiredArgument(cmd, Arg.CLUSTER_LIST)); String srcZKUrl = getRequiredArgument(cmd, Arg.SRC_ZOOKEEPER_URL); diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 05603f884c4..4cda268e058 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -297,7 +297,10 @@ public enum Arg { ), DAVINCI_HEARTBEAT_REPORTED( "dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats" - ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"); + ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"), + ADMIN_OPERATION_PROTOCOL_VERSION( + "admin-operation-protocol-version", "aopv", true, "Admin operation protocol version" + ); private final String argName; private final String first; diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index abdf6089278..fb77d377ff0 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.Arg.ACCESS_CONTROL; import static com.linkedin.venice.Arg.ACL_PERMS; import static com.linkedin.venice.Arg.ACTIVE_ACTIVE_REPLICATION_ENABLED; +import static com.linkedin.venice.Arg.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.Arg.ALLOW_STORE_MIGRATION; import static com.linkedin.venice.Arg.AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED; import static com.linkedin.venice.Arg.BACKUP_FOLDER; @@ -582,6 +583,10 @@ public enum Command { "dump-host-heartbeat", "Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.", new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED } + ), + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION( + "update-admin-operation-protocol-version", "Update the admin operation protocol version", + new Arg[] { URL, CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION } ); private final String commandName; diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index 17e72096942..9069052669c 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -412,4 +412,11 @@ public void testAdminConfigureView() throws ParseException, IOException { CommandLine finalCommandLine = commandLine; Assert.assertThrows(() -> AdminTool.getConfigureStoreViewQueryParams(finalCommandLine)); } + + @Test + public void testUpdateAdminOperationProtocolVersionWithInvalidInput() { + String[] args = { "--update-admin-operation-protocol-version", "--url", "http://localhost:7036", "--cluster", + "test-cluster", "--admin-operation-protocol-version", "thisShouldBeLongValue" }; + Assert.assertThrows(VeniceException.class, () -> AdminTool.main(args)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java index 34264f1afd4..c014ef863f6 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java @@ -18,6 +18,11 @@ public class AdminTopicMetadataResponse extends ControllerResponse { */ private long upstreamOffset = -1; + /** + * The current admin operation protocol version, which is cluster-level and be SOT for serialize/deserialize admin operation message + */ + private long adminOperationProtocolVersion = -1; + public long getExecutionId() { return executionId; } @@ -41,4 +46,12 @@ public void setOffset(long offset) { public void setUpstreamOffset(long upstreamOffset) { this.upstreamOffset = upstreamOffset; } + + public void setAdminOperationProtocolVersion(long adminOperationProtocolVersion) { + this.adminOperationProtocolVersion = adminOperationProtocolVersion; + } + + public long getAdminOperationProtocolVersion() { + return adminOperationProtocolVersion; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 7dddb841afe..498efdb7a96 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -225,6 +225,7 @@ public class ControllerApiConstants { public static final String KAFKA_TOPIC_RETENTION_IN_MS = "kafka.topic.retention.in.ms"; public static final String KAFKA_TOPIC_MIN_IN_SYNC_REPLICA = "kafka.topic.min.in.sync.replica"; public static final String UPSTREAM_OFFSET = "upstream_offset"; + public static final String ADMIN_OPERATION_PROTOCOL_VERSION = "admin_operation_protocol_version"; public static final String PERSONA_NAME = "persona_name"; public static final String PERSONA_OWNERS = "persona_owners"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index 11452d95fe3..bc49f285c9c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -1,6 +1,7 @@ package com.linkedin.venice.controllerapi; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_PERMISSION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.AMPLIFICATION_FACTOR; import static com.linkedin.venice.controllerapi.ControllerApiConstants.BATCH_JOB_HEARTBEAT_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; @@ -1362,6 +1363,14 @@ public ControllerResponse updateAdminTopicMetadata( return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, ControllerResponse.class); } + public ControllerResponse updateAdminOperationProtocolVersion( + String clusterName, + Long adminOperationProtocolVersion) { + QueryParams params = + newParams().add(CLUSTER, clusterName).add(ADMIN_OPERATION_PROTOCOL_VERSION, adminOperationProtocolVersion); + return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, ControllerResponse.class); + } + public ControllerResponse deleteKafkaTopic(String topicName) { QueryParams params = newParams().add(TOPIC, topicName); return request(ControllerRoute.DELETE_KAFKA_TOPIC, params, ControllerResponse.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java index 8ddc43c8d5b..9e203916bba 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_CONTROLLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_PERMISSION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.AMPLIFICATION_FACTOR; import static com.linkedin.venice.controllerapi.ControllerApiConstants.AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.BACKUP_STRATEGY; @@ -284,6 +285,10 @@ public enum ControllerRoute { UPDATE_ADMIN_TOPIC_METADATA( "/update_admin_topic_metadata", HttpMethod.POST, Arrays.asList(CLUSTER, EXECUTION_ID), NAME, OFFSET, UPSTREAM_OFFSET + ), + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION( + "/update_admin_operation_protocol_version", HttpMethod.POST, + Arrays.asList(CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION) ), DELETE_KAFKA_TOPIC("/delete_kafka_topic", HttpMethod.POST, Arrays.asList(CLUSTER, TOPIC)), CREATE_STORAGE_PERSONA( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java index 31e8e18b315..c76af357746 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java @@ -7,10 +7,13 @@ import com.linkedin.venice.AdminTool; import com.linkedin.venice.Arg; +import com.linkedin.venice.controllerapi.AdminTopicMetadataResponse; import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.MultiStoreResponse; import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixAdapterSerializer; @@ -19,7 +22,10 @@ import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceServerWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -27,6 +33,7 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.impl.client.ZkClient; @@ -186,4 +193,77 @@ public void testNodeReplicasReadinessCommand() throws Exception { clusterName, "--storage-node", Utils.getHelixNodeIdentifier(Utils.getHostName(), server.getPort()) }; AdminTool.main(nodeReplicasReadinessArgs); } + + @Test(timeOut = 4 * TEST_TIMEOUT) + public void testUpdateAdminOperationVersion() throws Exception { + Long defaultVersion = -1L; + Long newVersion = 80L; + String storeName = Utils.getUniqueString("test-store"); + try (VeniceTwoLayerMultiRegionMultiClusterWrapper venice = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) + .numberOfClusters(1) + .numberOfParentControllers(1) + .numberOfChildControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .build());) { + String clusterName = venice.getClusterNames()[0]; + + // Get the parent con†roller + VeniceControllerWrapper parentController = venice.getParentControllers().get(0); + ControllerClient parentControllerClient = new ControllerClient(clusterName, parentController.getControllerUrl()); + + // Verify the original metadata - default value + AdminTopicMetadataResponse originalMetadata = parentControllerClient.getAdminTopicMetadata(Optional.empty()); + Assert.assertEquals(originalMetadata.getAdminOperationProtocolVersion(), (long) defaultVersion); + Assert.assertEquals(originalMetadata.getExecutionId(), (long) defaultVersion); + Assert.assertEquals(originalMetadata.getOffset(), (long) defaultVersion); + Assert.assertEquals(originalMetadata.getUpstreamOffset(), (long) defaultVersion); + + // Create store + NewStoreResponse newStoreResponse = + parentControllerClient.createNewStore(storeName, "test", "\"string\"", "\"string\""); + Assert.assertFalse(newStoreResponse.isError()); + VersionCreationResponse versionCreationResponse = + parentControllerClient.emptyPush(storeName, Utils.getUniqueString("empty-push-1"), 1L); + Assert.assertFalse(versionCreationResponse.isError()); + + // Update store config + ControllerResponse updateStore = + parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setBatchGetLimit(100)); + Assert.assertFalse(updateStore.isError()); + + // Check the baseline metadata + AdminTopicMetadataResponse metdataAfterStoreCreation = + parentControllerClient.getAdminTopicMetadata(Optional.empty()); + long baselineExecutionId = metdataAfterStoreCreation.getExecutionId(); + long baselineOffset = metdataAfterStoreCreation.getOffset(); + long baselineUpstreamOffset = metdataAfterStoreCreation.getUpstreamOffset(); + long baselineAdminVersion = metdataAfterStoreCreation.getAdminOperationProtocolVersion(); + + // Execution id and offset should be positive now since we have created a store and updated the store config + Assert.assertEquals(baselineAdminVersion, (long) defaultVersion); + Assert.assertTrue(baselineExecutionId > 0); + Assert.assertTrue(baselineOffset > 0); + Assert.assertEquals(baselineUpstreamOffset, (long) defaultVersion); + + // Update the admin operation version to newVersion - 80 + String[] updateAdminOperationVersionArgs = + { "--update-admin-operation-protocol-version", "--url", parentController.getControllerUrl(), "--cluster", + clusterName, "--admin-operation-protocol-version", newVersion.toString() }; + + AdminTool.main(updateAdminOperationVersionArgs); + + // Verify the admin operation metadata version is updated and the remaining data is unchanged + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + AdminTopicMetadataResponse updatedMetadata = parentControllerClient.getAdminTopicMetadata(Optional.empty()); + Assert.assertEquals(updatedMetadata.getAdminOperationProtocolVersion(), (long) newVersion); + Assert.assertEquals(updatedMetadata.getExecutionId(), baselineExecutionId); + Assert.assertEquals(updatedMetadata.getOffset(), baselineOffset); + Assert.assertEquals(updatedMetadata.getUpstreamOffset(), baselineUpstreamOffset); + }); + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java index a95ebd8bec3..1e55abc3aa9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java @@ -32,6 +32,8 @@ import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.io.IOException; +import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -196,6 +198,51 @@ public void testParallelAdminExecutionTasks() throws IOException, InterruptedExc } } + @Test(timeOut = 2 * TIMEOUT) + public void testUpdateAdminOperationVersion() { + Long currentVersion = -1L; + Long newVersion = 18L; + try (VeniceTwoLayerMultiRegionMultiClusterWrapper venice = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) + .numberOfClusters(1) + .numberOfParentControllers(1) + .numberOfChildControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .build())) { + + String clusterName = venice.getClusterNames()[0]; + + // Get the child controller + VeniceControllerWrapper controller = venice.getChildRegions().get(0).getLeaderController(clusterName); + Admin admin = controller.getVeniceAdmin(); + + AdminConsumerService adminConsumerService = controller.getAdminConsumerServiceByCluster(clusterName); + + // Setup the original metadata + adminConsumerService.updateAdminOperationProtocolVersion(clusterName, currentVersion); + + // Verify that the original metadata is correct + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + Map adminTopicMetadata = admin.getAdminTopicMetadata(clusterName, Optional.empty()); + Assert.assertTrue(adminTopicMetadata.containsKey("adminOperationProtocolVersion")); + Assert.assertEquals(adminTopicMetadata.get("adminOperationProtocolVersion"), currentVersion); + }); + + // Update the admin operation version + admin.updateAdminOperationProtocolVersion(clusterName, newVersion); + + // Verify the admin operation metadata version is updated + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + Map adminTopicMetadata = admin.getAdminTopicMetadata(clusterName, Optional.empty()); + Assert.assertTrue(adminTopicMetadata.containsKey("adminOperationProtocolVersion")); + Assert.assertEquals(adminTopicMetadata.get("adminOperationProtocolVersion"), newVersion); + }); + } + } + private Runnable getRunnable( VeniceTwoLayerMultiRegionMultiClusterWrapper venice, String storeName, diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java index 087b359bde7..4890db30b8e 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java @@ -16,7 +16,7 @@ public class InMemoryAdminTopicMetadataAccessor extends AdminTopicMetadataAccess @Override public void updateMetadata(String clusterName, Map metadata) { - inMemoryMetadata = metadata; + inMemoryMetadata.putAll(metadata); LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, metadata); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 11298442e95..053c448bf97 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -953,6 +953,8 @@ void updateAdminTopicMetadata( Optional offset, Optional upstreamOffset); + void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion); + void createStoragePersona( String clusterName, String name, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java index 66c7b75daee..51f77b45b76 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java @@ -3,6 +3,7 @@ import com.linkedin.venice.utils.Pair; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public abstract class AdminTopicMetadataAccessor { @@ -14,17 +15,24 @@ public abstract class AdminTopicMetadataAccessor { */ private static final String UPSTREAM_OFFSET_KEY = "upstreamOffset"; private static final String EXECUTION_ID_KEY = "executionId"; + private static final String ADMIN_OPERATION_PROTOCOL_VERSION_KEY = "adminOperationProtocolVersion"; private static final long UNDEFINED_VALUE = -1; /** * @return a map with {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY}, {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}, - * {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY} specified to input values. + * {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY}, {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY} + * specified to input values. */ - public static Map generateMetadataMap(long localOffset, long upstreamOffset, long executionId) { + public static Map generateMetadataMap( + Optional localOffset, + Optional upstreamOffset, + Optional executionId, + Optional adminOperationProtocolVersion) { Map metadata = new HashMap<>(); - metadata.put(OFFSET_KEY, localOffset); - metadata.put(UPSTREAM_OFFSET_KEY, upstreamOffset); - metadata.put(EXECUTION_ID_KEY, executionId); + localOffset.ifPresent(offset -> metadata.put(OFFSET_KEY, offset)); + upstreamOffset.ifPresent(offset -> metadata.put(UPSTREAM_OFFSET_KEY, offset)); + executionId.ifPresent(id -> metadata.put(EXECUTION_ID_KEY, id)); + adminOperationProtocolVersion.ifPresent(version -> metadata.put(ADMIN_OPERATION_PROTOCOL_VERSION_KEY, version)); return metadata; } @@ -46,7 +54,14 @@ public static long getExecutionId(Map metadata) { } /** - * Update all relevant metadata for a given cluster in a single transaction. + * @return the value to which the specified key is mapped to {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY}. + */ + public static long getAdminOperationProtocolVersion(Map metadata) { + return metadata.getOrDefault(ADMIN_OPERATION_PROTOCOL_VERSION_KEY, UNDEFINED_VALUE); + } + + /** + * Update specific metadata for a given cluster in a single transaction with information provided in metadata. * @param clusterName of the cluster at interest. * @param metadata map containing relevant information. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 45e6cb50510..db1ce315564 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7498,16 +7498,18 @@ public Optional getAdminCommandExecutionTracker(St } /** - * @return cluster-level execution id, offset and upstream offset. If store name is specified, it returns store-level execution id. + * @return cluster-level execution id, offset, upstream offset, and admin operation protocol version. + * If store name is specified, it returns store-level execution id. */ public Map getAdminTopicMetadata(String clusterName, Optional storeName) { if (storeName.isPresent()) { - Long executionId = executionIdAccessor.getLastSucceededExecutionIdMap(clusterName).get(storeName.get()); + Long executionId = getExecutionIdAccessor().getLastSucceededExecutionIdMap(clusterName).get(storeName.get()); return executionId == null ? Collections.emptyMap() - : AdminTopicMetadataAccessor.generateMetadataMap(-1, -1, executionId); + : AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(-1L), Optional.of(-1L), Optional.of(executionId), Optional.of(-1L)); } - return adminConsumerServices.get(clusterName).getAdminTopicMetadata(clusterName); + return getAdminConsumerService(clusterName).getAdminTopicMetadata(clusterName); } /** @@ -7521,16 +7523,24 @@ public void updateAdminTopicMetadata( Optional offset, Optional upstreamOffset) { if (storeName.isPresent()) { - executionIdAccessor.updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId); + getExecutionIdAccessor().updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId); } else { if (!offset.isPresent() || !upstreamOffset.isPresent()) { throw new VeniceException("Offsets must be provided to update cluster-level admin topic metadata"); } - adminConsumerServices.get(clusterName) + getAdminConsumerService(clusterName) .updateAdminTopicMetadata(clusterName, executionId, offset.get(), upstreamOffset.get()); } } + /** + * Update the version of admin operation protocol in admin topic metadata + */ + public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) { + getAdminConsumerService(clusterName) + .updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } + /** * @see Admin#getRoutersClusterConfig(String) */ @@ -7670,6 +7680,11 @@ public VeniceProperties getPubSubSSLProperties(String pubSubBrokerAddress) { return this.getPubSubSSLPropertiesFromControllerConfig(pubSubBrokerAddress); } + // public for testing purpose + public AdminConsumerService getAdminConsumerService(String clusterName) { + return adminConsumerServices.get(clusterName); + } + private void startMonitorOfflinePush( String clusterName, String kafkaTopic, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 2e680a42fb8..0f895efcec8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -4268,11 +4268,11 @@ public Optional getAdminCommandExecutionTracker(St } /** - * Unsupported operation in the parent controller. + * Get AdminTopicMetadata from parent controller */ @Override public Map getAdminTopicMetadata(String clusterName, Optional storeName) { - throw new VeniceUnsupportedOperationException("getAdminTopicMetadata"); + return getVeniceHelixAdmin().getAdminTopicMetadata(clusterName, storeName); } /** @@ -4288,6 +4288,14 @@ public void updateAdminTopicMetadata( throw new VeniceUnsupportedOperationException("updateAdminTopicMetadata"); } + /** + * Update AdminOperationProtocolVersion in metadata + */ + @Override + public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) { + getVeniceHelixAdmin().updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } + /** * Unsupported operation in the parent controller. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java index 0f892f654c5..cda6fbff92c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java @@ -36,13 +36,25 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad } /** + * Update the upstream metadata map for the given cluster with specific information provided in metadataDelta + * * @see AdminTopicMetadataAccessor#updateMetadata(String, Map) */ @Override - public void updateMetadata(String clusterName, Map metadata) { + public void updateMetadata(String clusterName, Map metadataDelta) { String path = getAdminTopicMetadataNodePath(clusterName); - HelixUtils.update(zkMapAccessor, path, metadata, ZK_UPDATE_RETRY); - LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, metadata); + HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> { + if (currentMetadataMap == null) { + currentMetadataMap = new HashMap<>(); + } + LOGGER.info( + "Updating AdminTopicMetadata map for cluster: {}. Current metadata: {}. New delta metadata: {}", + clusterName, + currentMetadataMap, + metadataDelta); + currentMetadataMap.putAll(metadataDelta); + return currentMetadataMap; + }); } /** diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java index abceff1af2e..00cad49f786 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java @@ -17,6 +17,7 @@ import com.linkedin.venice.service.AbstractVeniceService; import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.locks.AutoCloseableLock; import io.tehuti.metrics.MetricsRepository; import java.util.Map; import java.util.Optional; @@ -187,8 +188,15 @@ public Map getAdminTopicMetadata(String clusterName) { */ public void updateAdminTopicMetadata(String clusterName, long executionId, long offset, long upstreamOffset) { if (clusterName.equals(config.getClusterName())) { - Map metadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId); - adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + try (AutoCloseableLock ignore = + admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { + Map metadata = AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(offset), + Optional.of(upstreamOffset), + Optional.of(executionId), + Optional.empty()); + adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + } } else { throw new VeniceException( "This AdminConsumptionService is for cluster: " + config.getClusterName() @@ -196,6 +204,27 @@ public void updateAdminTopicMetadata(String clusterName, long executionId, long } } + /** + * Update the admin operation protocol version for the given cluster. + */ + public void updateAdminOperationProtocolVersion(String clusterName, long adminOperationProtocolVersion) { + if (clusterName.equals(config.getClusterName())) { + try (AutoCloseableLock ignore = + admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { + Map metadata = AdminTopicMetadataAccessor.generateMetadataMap( + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(adminOperationProtocolVersion)); + adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + } + } else { + throw new VeniceException( + "This AdminConsumptionService is for cluster: " + config.getClusterName() + + ". Cannot update the version for cluster: " + clusterName); + } + } + private PubSubConsumerAdapter createKafkaConsumer(String clusterName) { String pubSubServerUrl = remoteConsumptionEnabled ? remoteKafkaServerUrl.get() : localKafkaServerUrl; Properties kafkaConsumerProperties = admin.getPubSubSSLProperties(pubSubServerUrl).toProperties(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index c294fc1a4e2..2fef1d90311 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -34,6 +34,7 @@ import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.locks.AutoCloseableLock; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -896,16 +897,25 @@ private void persistAdminTopicMetadata() { // Skip since there are no new admin messages processed. return; } - Map metadata = remoteConsumptionEnabled - ? AdminTopicMetadataAccessor - .generateMetadataMap(localOffsetCheckpointAtStartTime, lastOffset, lastDelegatedExecutionId) - : AdminTopicMetadataAccessor - .generateMetadataMap(lastOffset, upstreamOffsetCheckpointAtStartTime, lastDelegatedExecutionId); - adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); - lastPersistedOffset = lastOffset; - lastPersistedExecutionId = lastDelegatedExecutionId; - LOGGER.info("Updated lastPersistedOffset to {}", lastPersistedOffset); - stats.setAdminConsumptionCheckpointOffset(lastPersistedOffset); + try (AutoCloseableLock ignore = + admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { + Map metadata = remoteConsumptionEnabled + ? AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(localOffsetCheckpointAtStartTime), + Optional.of(lastOffset), + Optional.of(lastDelegatedExecutionId), + Optional.empty()) + : AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(lastOffset), + Optional.of(upstreamOffsetCheckpointAtStartTime), + Optional.of(lastDelegatedExecutionId), + Optional.empty()); + adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + lastPersistedOffset = lastOffset; + lastPersistedExecutionId = lastDelegatedExecutionId; + LOGGER.info("Updated lastPersistedOffset to {}", lastPersistedOffset); + stats.setAdminConsumptionCheckpointOffset(lastPersistedOffset); + } } void skipMessageWithOffset(long offset) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index eec4fc44207..8b3d8d58ffa 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -95,6 +95,7 @@ import static com.linkedin.venice.controllerapi.ControllerRoute.STORE; import static com.linkedin.venice.controllerapi.ControllerRoute.STORE_MIGRATION_ALLOWED; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ACL; +import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_CLUSTER_CONFIG; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_LOG_COMPACTION; @@ -636,7 +637,11 @@ public boolean startInner() throws Exception { admin, adminTopicMetadataRoutes .updateAdminTopicMetadata(admin, requestHandler.getClusterAdminOpsRequestHandler()))); - + httpService.post( + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getPath(), + new VeniceParentControllerRegionStateHandler( + admin, + adminTopicMetadataRoutes.updateAdminOperationProtocolVersion(admin))); httpService.post( DELETE_KAFKA_TOPIC.getPath(), new VeniceParentControllerRegionStateHandler(admin, storesRoutes.deleteKafkaTopic(admin))); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index d854a9d5fa8..fa2a6300e04 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -1,11 +1,13 @@ package com.linkedin.venice.controller.server; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.EXECUTION_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OFFSET; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPSTREAM_OFFSET; import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ADMIN_TOPIC_METADATA; +import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA; import com.linkedin.venice.HttpConstants; @@ -54,6 +56,7 @@ public Route getAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler re if (!storeName.isPresent()) { responseObject.setOffset(adminTopicMetadata.getOffset()); responseObject.setUpstreamOffset(adminTopicMetadata.getUpstreamOffset()); + responseObject.setAdminOperationProtocolVersion(adminTopicMetadata.getAdminOperationProtocolVersion()); } } catch (Throwable e) { responseObject.setError(e); @@ -101,4 +104,32 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); }; } + + public Route updateAdminOperationProtocolVersion(Admin admin) { + return (request, response) -> { + AdminTopicMetadataResponse responseObject = new AdminTopicMetadataResponse(); + response.type(HttpConstants.JSON); + try { + if (!isAllowListUser(request)) { + response.status(HttpStatus.SC_FORBIDDEN); + responseObject.setError("Only admin users are allowed to run " + request.url()); + responseObject.setErrorType(ErrorType.BAD_REQUEST); + return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); + } + + AdminSparkServer.validateParams(request, UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getParams(), admin); + String clusterName = request.queryParams(CLUSTER); + Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)); + + responseObject.setCluster(clusterName); + responseObject.setAdminOperationProtocolVersion(adminOperationProtocolVersion); + + admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } catch (Throwable e) { + responseObject.setError(e); + AdminSparkServer.handleError(new VeniceException(e), request, response); + } + return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); + }; + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index 8a6d0c425b9..ccdcdb097b3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -102,6 +102,8 @@ public AdminTopicMetadataGrpcResponse getAdminTopicMetadata(AdminTopicMetadataGr Pair offsets = AdminTopicMetadataAccessor.getOffsets(metadata); adminMetadataBuilder.setOffset(offsets.getFirst()); adminMetadataBuilder.setUpstreamOffset(offsets.getSecond()); + adminMetadataBuilder + .setAdminOperationProtocolVersion(AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata)); } else { adminMetadataBuilder.setStoreName(storeName); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 1ee4f23849a..5e91eb7d45e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -25,6 +25,7 @@ import static org.testng.Assert.expectThrows; import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.controller.stats.VeniceAdminStats; import com.linkedin.venice.exceptions.VeniceException; @@ -955,4 +956,89 @@ public void testCleanupWhenPushCompleteWithViewConfigs() { assertEquals(pubSubTopics.get(i).getName(), expectedUpdateCompactionTopics.get(i)); } } + + @Test + public void testGetAdminTopicMetadata() { + String clusterName = "test-cluster"; + String storeName = "test-store"; + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.of(storeName)); + doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.empty()); + + // Case 1: Not store name provided + Map remoteMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(10L), Optional.of(-1L), Optional.of(1L), Optional.of(1L)); + AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); + when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); + when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); + + Map metadata = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.empty()); + assertEquals(metadata, remoteMetadata); + + // Case 2: Store name is provided + ExecutionIdAccessor executionIdAccessor = mock(ExecutionIdAccessor.class); + Map executionIdMap = new HashMap<>(); + executionIdMap.put(storeName, 10L); + when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); + when(executionIdAccessor.getLastSucceededExecutionIdMap(anyString())).thenReturn(executionIdMap); + when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); + when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); + + Map expectedMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(-1L), Optional.of(-1L), Optional.of(10L), Optional.of(-1L)); + Map metadataForStore = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.of(storeName)); + assertEquals(metadataForStore, expectedMetadata); + } + + @Test + public void testUpdateAdminTopicMetadata() { + String clusterName = "test-cluster"; + String storeName = "test-store"; + long executionId = 10L; + Long offset = 10L; + Long upstreamOffset = 1L; + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doCallRealMethod().when(veniceHelixAdmin) + .updateAdminTopicMetadata(clusterName, executionId, Optional.of(storeName), Optional.empty(), Optional.empty()); + doCallRealMethod().when(veniceHelixAdmin) + .updateAdminTopicMetadata( + clusterName, + executionId, + Optional.empty(), + Optional.of(offset), + Optional.of(upstreamOffset)); + + // Case 1: Store name is provided + ExecutionIdAccessor executionIdAccessor = mock(ExecutionIdAccessor.class); + when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); + + veniceHelixAdmin + .updateAdminTopicMetadata(clusterName, executionId, Optional.of(storeName), Optional.empty(), Optional.empty()); + verify(executionIdAccessor, times(1)).updateLastSucceededExecutionIdMap(clusterName, storeName, executionId); + + // Case 2: Store name is not provided + AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); + when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); + veniceHelixAdmin.updateAdminTopicMetadata( + clusterName, + executionId, + Optional.empty(), + Optional.of(offset), + Optional.of(upstreamOffset)); + verify(executionIdAccessor, never()).updateLastSucceededExecutionId(anyString(), anyLong()); + verify(adminConsumerService, times(1)).updateAdminTopicMetadata(clusterName, executionId, offset, upstreamOffset); + } + + @Test + public void testUpdateAdminOperationProtocolVersion() { + String clusterName = "test-cluster"; + Long adminProtocolVersion = 10L; + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doCallRealMethod().when(veniceHelixAdmin).updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion); + AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); + when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); + + veniceHelixAdmin.updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion); + verify(adminConsumerService, times(1)).updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion); + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index d1c1adfc4be..9f355ffdf29 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -299,7 +299,9 @@ public void testAddStore() { .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); @@ -378,8 +380,9 @@ public void testCreateStoreForMultiCluster() { String valueSchemaStr = "\"string\""; when(veniceWriter.put(any(), any(), anyInt())).then(invocation -> { // Once we send message to topic through venice writer, return offset 1 - when(zkClient.readData(metadataPath, null)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + when(zkClient.readData(metadataPath, null)).thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); CompletableFuture future = mock(CompletableFuture.class); doReturn(new SimplePubSubProduceResultImpl(adminTopic, partitionId, 1, -1)).when(future).get(); return future; @@ -491,7 +494,9 @@ public void testAddValueSchema() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.addValueSchema(clusterName, storeName, valueSchemaStr, DirectionalSchemaCompatibilityType.FULL); @@ -543,7 +548,9 @@ public void testAddDerivedSchema() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.addDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaStr); @@ -570,7 +577,9 @@ public void testDisableStoreRead() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -606,7 +615,9 @@ public void testDisableStoreWrite() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -647,7 +658,9 @@ public void testDisableStoreWriteWhenStoreDoesNotExist() { when(zkClient.readData(zkMetadataNodePath, null)) .thenReturn(new OffsetRecord(AvroProtocolDefinition.PARTITION_STATE.getSerializer())) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); assertThrows(VeniceNoStoreException.class, () -> parentAdmin.setStoreWriteability(clusterName, storeName, false)); @@ -660,7 +673,9 @@ public void testEnableStoreRead() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -696,7 +711,9 @@ public void testEnableStoreWrite() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -735,7 +752,9 @@ public void testKillOfflinePushJob() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); Store store = mock(Store.class); doReturn(store).when(internalAdmin).getStore(clusterName, pubSubTopic.getStoreName()); @@ -798,7 +817,9 @@ public void testIdempotentIncrementVersionWhenNoPreviousTopics() { .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); partialMockParentAdmin.incrementVersionIdempotent(clusterName, storeName, pushJobId, 1, 1); verify(internalAdmin).addVersionAndTopicOnly( clusterName, @@ -929,7 +950,9 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistAndOfflineJobIs .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); Version newVersion = partialMockParentAdmin.incrementVersionIdempotent( clusterName, storeName, @@ -1019,7 +1042,9 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistButTruncated() .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); partialMockParentAdmin.incrementVersionIdempotent( clusterName, storeName, @@ -1783,7 +1808,9 @@ public void testUpdateStore() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); UpdateStoreQueryParams storeQueryParams1 = new UpdateStoreQueryParams().setBlobTransferEnabled(true); parentAdmin.initStorageCluster(clusterName); @@ -1913,7 +1940,9 @@ public void testUpdateStoreNativeReplicationSourceFabric() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin @@ -1938,7 +1967,9 @@ public void testUpdateStoreTargetSwapRegion() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams().setTargetRegionSwap("prod") .setTargetRegionSwapWaitTime(100) @@ -1976,7 +2007,9 @@ public void testDisableHybridConfigWhenActiveActiveOrIncPushConfigIsEnabled() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); // When user disable hybrid but also try to manually turn on A/A or Incremental Push, update operation should fail @@ -2147,7 +2180,9 @@ public void testRemoveStoreViewConfig() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.updateStore( @@ -2178,7 +2213,9 @@ public void testUpdateStoreWithBadPartitionerConfigs() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); assertThrows( @@ -2230,7 +2267,9 @@ public void testDeleteStore() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.deleteStore(clusterName, storeName, false, 0, true); @@ -2569,7 +2608,9 @@ public void testAdminCanKillLingeringVersion(boolean isIncrementalPush) { .put(any(), any(), anyInt()); mockControllerClients(storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); if (isIncrementalPush) { /** @@ -2727,7 +2768,9 @@ public void testHybridAndIncrementalUpdateStoreCommands(boolean aaEnabled) { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.updateStore( @@ -2792,7 +2835,9 @@ public void testSendAdminMessageAcquiresClusterReadLock() { .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); String storeName = "test-store"; String owner = "test-owner"; @@ -2967,7 +3012,9 @@ private Store setupForStoreViewConfigUpdateTest(String storeName) { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); return store; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java index ba63bf15e3c..36618899709 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java @@ -126,7 +126,9 @@ public void testDeleteStoreWithAuthorization() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); parentAdmin.initStorageCluster(clusterName); parentAdmin.deleteStore(clusterName, storeName, false, 0, true); @@ -175,7 +177,9 @@ public void testUpdateAclException() { .checkPreConditionForAclOp(clusterName, storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(new OffsetRecord(partitionStateSerializer)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); Assert.assertThrows( VeniceNoStoreException.class, @@ -193,7 +197,9 @@ public void testGetAclException() { .checkPreConditionForAclOp(clusterName, storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(new OffsetRecord(partitionStateSerializer)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); Assert.assertThrows(VeniceNoStoreException.class, () -> parentAdmin.getAclForStore(clusterName, storeName)); } @@ -208,7 +214,9 @@ public void testDeleteAclException() { .checkPreConditionForAclOp(clusterName, storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(new OffsetRecord(partitionStateSerializer)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); Assert.assertThrows(VeniceNoStoreException.class, () -> parentAdmin.deleteAclForStore(clusterName, storeName)); Assert.assertEquals(0, authorizerService.clearAclCounter); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java new file mode 100644 index 00000000000..aedb176710b --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java @@ -0,0 +1,89 @@ +package com.linkedin.venice.controller; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.linkedin.venice.helix.HelixAdapterSerializer; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestZkAdminTopicMetadataAccessor { + private ZkClient zkClient; + private HelixAdapterSerializer adapterSerializer; + private ZkAdminTopicMetadataAccessor zkAdminTopicMetadataAccessor; + + @BeforeMethod + public void setUp() { + zkClient = mock(ZkClient.class); + adapterSerializer = mock(HelixAdapterSerializer.class); + zkAdminTopicMetadataAccessor = new ZkAdminTopicMetadataAccessor(zkClient, adapterSerializer); + } + + @Test + public void testUpdateMetadata() { + String clusterName = "test-cluster"; + + // Original metadata + Map currentMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); + + // New metadata + Map newMetadata = new HashMap<>(); + newMetadata.put("offset", 100L); + + // Updated metadata with new metadata + Map updatedMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(100L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); + + String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); + try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class)) { + dataTreeMockedStatic.when(() -> DataTree.copyStat(any(), any())).thenAnswer(invocation -> null); + Stat readStat = new Stat(); + + when(zkClient.readData(metadataPath, readStat)).thenReturn(null) // Case 1: when there is no metadata + .thenReturn(currentMetadata); // Case 2: the metadata is not null + + // Case 1: when there is no metadata - null + zkAdminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); + verify(zkClient, times(1)).writeDataGetStat(metadataPath, newMetadata, 0); + + // Case 2: the metadata is not null + zkAdminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); + verify(zkClient, times(1)).writeDataGetStat(metadataPath, updatedMetadata, 0); + + // Verify that the metadata path got read 2 times + verify(zkClient, times(2)).readData(metadataPath, readStat); + } + } + + @Test + public void testGetMetadata() { + String clusterName = "test-cluster"; + Map currentMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); + String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); + + when(zkClient.readData(metadataPath, null)).thenReturn(null).thenReturn(currentMetadata); + + // Case 1: when there is no metadata + Map metadata = zkAdminTopicMetadataAccessor.getMetadata(clusterName); + assertEquals(metadata, new HashMap<>()); + + // Case 2: the metadata is not null + metadata = zkAdminTopicMetadataAccessor.getMetadata(clusterName); + assertEquals(metadata, currentMetadata); + } +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index 71fcaa5034f..6c6e7244c67 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -19,6 +19,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyDouble; @@ -27,6 +28,7 @@ import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; @@ -42,6 +44,7 @@ import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.controller.AdminTopicMetadataAccessor; import com.linkedin.venice.controller.ExecutionIdAccessor; +import com.linkedin.venice.controller.HelixVeniceClusterResources; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.kafka.AdminTopicUtils; import com.linkedin.venice.controller.kafka.protocol.admin.AddVersion; @@ -97,6 +100,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.locks.ClusterLockManager; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.io.IOException; @@ -189,6 +193,12 @@ public void methodSetup() { doReturn(new HashSet<>(Arrays.asList(pubSubTopic))).when(topicManager).listTopics(); doReturn(topicManager).when(admin).getTopicManager(); doReturn(true).when(topicManager).containsTopicAndAllPartitionsAreOnline(pubSubTopic); + + HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class, RETURNS_DEEP_STUBS); + ClusterLockManager lockManager = new ClusterLockManager(clusterName); + doReturn(resources).when(admin).getHelixVeniceClusterResources(clusterName); + doReturn(lockManager).when(resources).getClusterLockManager(); + doCallRealMethod().when(resources).getStoreMetadataRepository(); } @AfterMethod @@ -734,7 +744,11 @@ public void testRunWithFalsePositiveMissingMessagesWhenFirstBecomeLeaderControll AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); adminTopicMetadataAccessor.updateMetadata( clusterName, - AdminTopicMetadataAccessor.generateMetadataMap(metadataForStoreName0Future.get().getOffset(), -1, 1)); + AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(metadataForStoreName0Future.get().getOffset()), + Optional.of(-1L), + Optional.of(1L), + Optional.empty())); // Write a message with a skipped execution id but a different producer metadata. veniceWriter.put( @@ -820,7 +834,8 @@ public void testRunWithBiggerStartingOffset() throws InterruptedException, IOExc // The store doesn't exist doReturn(false).when(admin).hasStore(clusterName, storeName1); doReturn(false).when(admin).hasStore(clusterName, storeName2); - Map newMetadata = AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1); + Map newMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty()); adminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); AdminConsumptionTask task = getAdminConsumptionTask(new RandomPollStrategy(), false); @@ -1109,7 +1124,8 @@ public void testResubscribe() throws IOException, InterruptedException, TimeoutE getKillOfflinePushJobMessage(clusterName, storeTopicName, 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); long offset = future.get(TIMEOUT, TimeUnit.MILLISECONDS).getOffset(); - Map newMetadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, -1, 4L); + Map newMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(offset), Optional.of(-1L), Optional.of(4L), Optional.empty()); adminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); executionIdAccessor.updateLastSucceededExecutionIdMap(clusterName, storeName, 4L); // Resubscribe to the admin topic and make sure it can still process new admin messages diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java index 39e7b3a263a..863f50f54b0 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.controller.server; import static com.linkedin.venice.VeniceConstants.CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.EXECUTION_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_NAME; @@ -215,4 +216,72 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception { assertNotNull(responseObject.getError()); assertTrue(responseObject.getError().contains("Internal error")); } + + @Test + public void testUpdateAdminOperationProtocolVersion() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + String adminOperationProtocolVersion = "1"; + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER); + when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion); + + Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin); + + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + + assertEquals(responseObject.getCluster(), TEST_CLUSTER); + assertEquals(responseObject.getAdminOperationProtocolVersion(), 1L); + assertNull(responseObject.getError()); + } + + @Test + public void testUpdateAdminOperationProtocolVersionHandlesUnauthorizedAccess() throws Exception { + DynamicAccessController accessController = mock(DynamicAccessController.class); + when(accessController.isAllowlistUsers(any(), any(), any())).thenReturn(false); + HttpServletRequest httpServletRequest = mock(HttpServletRequest.class); + when(request.raw()).thenReturn(httpServletRequest); + X509Certificate certificate = mock(X509Certificate.class); + X500Principal principal = new X500Principal("CN=foo"); + X509Certificate[] certificates = new X509Certificate[] { mock(X509Certificate.class) }; + when(httpServletRequest.getAttribute(CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME)).thenReturn(certificates); + doReturn(principal).when(certificate).getSubjectX500Principal(); + doReturn(httpServletRequest).when(request).raw(); + + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + String adminOperationProtocolVersion = "1"; + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER); + when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion); + + Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController)) + .updateAdminOperationProtocolVersion(mockAdmin); + + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + + assertNotNull(responseObject.getError()); + assertTrue(responseObject.getError().contains("Only admin users are allowed")); + } + + @Test + public void testUpdateAdminOperationProtocolVersionHandlesMissingParams() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(null); // Missing cluster parameter + + Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin); + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + + verify(requestHandler, never()).getAdminTopicMetadata(any()); + assertNotNull(responseObject.getError()); + assertTrue(responseObject.getError().contains("cluster_name is a required parameter")); + } } diff --git a/services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000000..ca6ee9cea8e --- /dev/null +++ b/services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file