diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index ef28402ad38..d67c0f004d9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -80,6 +80,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER; import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED; +import static com.linkedin.venice.ConfigKeys.SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_SERVICE_ENABLED; @@ -564,6 +565,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled; private final int zstdDictCompressionLevel; private final long maxWaitAfterUnsubscribeMs; + private final boolean deleteUnassignedPartitionsOnStartup; public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException { this(serverProperties, Collections.emptyMap()); @@ -951,6 +953,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map generateService( Map> kafkaClusterMap, String serverD2ServiceName) { return (serviceName, dataDirectory) -> { + boolean serverDeleteUnassignedPartitionsOnStartup = + Boolean.parseBoolean(featureProperties.getProperty(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START, "false")); boolean enableServerAllowlist = Boolean.parseBoolean(featureProperties.getProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, "false")); boolean sslToKafka = Boolean.parseBoolean(featureProperties.getProperty(SERVER_SSL_TO_KAFKA, "false")); @@ -257,7 +262,8 @@ static StatefulServiceProvider generateService( pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName()) .put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000) .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000) - .put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); + .put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true) + .put(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP, serverDeleteUnassignedPartitionsOnStartup); if (sslToKafka) { serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 5a9f0079a27..354d8739ef0 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.server; import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED; +import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START; import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SERVER_ALLOW_LIST; import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_IS_AUTO_JOIN; @@ -203,6 +204,7 @@ public void testStartServerAndShutdownWithPartitionAssignmentVerification() { Properties featureProperties = new Properties(); featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + featureProperties.setProperty(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START, Boolean.toString(true)); cluster.addVeniceServer(featureProperties, new Properties()); VeniceServerWrapper server = cluster.getVeniceServers().get(0); Assert.assertTrue(server.getVeniceServer().isStarted()); @@ -218,6 +220,7 @@ public void testStartServerAndShutdownWithPartitionAssignmentVerification() { Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 3); cluster.stopVeniceServer(server.getPort()); + Assert.assertFalse(server.getVeniceServer().isStarted()); // Create new servers so partition assignment is removed for the offline participant cluster.addVeniceServer(featureProperties, new Properties());