Skip to content

Commit

Permalink
[server] Make cleaning up partitions on startup configurable. Off by …
Browse files Browse the repository at this point in the history
…default (#1380)

We need to test this feature thoroughly before rolling it out in prod.
  • Loading branch information
kvargha authored Dec 10, 2024
1 parent 9f852b0 commit 54ba087
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_SERVICE_ENABLED;
Expand Down Expand Up @@ -564,6 +565,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled;
private final int zstdDictCompressionLevel;
private final long maxWaitAfterUnsubscribeMs;
private final boolean deleteUnassignedPartitionsOnStartup;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -951,6 +953,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
}
maxWaitAfterUnsubscribeMs =
serverProperties.getLong(SERVER_MAX_WAIT_AFTER_UNSUBSCRIBE_MS, TimeUnit.MINUTES.toMillis(30));

deleteUnassignedPartitionsOnStartup =
serverProperties.getBoolean(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP, false);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1728,4 +1733,8 @@ public int getZstdDictCompressionLevel() {
public long getMaxWaitAfterUnsubscribeMs() {
return maxWaitAfterUnsubscribeMs;
}

public boolean isDeleteUnassignedPartitionsOnStartupEnabled() {
return deleteUnassignedPartitionsOnStartup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ public synchronized AbstractStorageEngine openStore(
}

public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) {
if (!serverConfig.isDeleteUnassignedPartitionsOnStartupEnabled()) {
return;
}

if (manager == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
configLoaderField.setAccessible(true);
configLoaderField.set(mockStorageService, mockVeniceConfigLoader);

VeniceServerConfig mockServerConfig = mock(VeniceServerConfig.class);
when(mockServerConfig.isDeleteUnassignedPartitionsOnStartupEnabled()).thenReturn(true);
Field serverConfigField = StorageService.class.getDeclaredField("serverConfig");
serverConfigField.setAccessible(true);
serverConfigField.set(mockStorageService, mockServerConfig);

doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
verify(abstractStorageEngine).dropPartition(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2342,4 +2342,7 @@ private ConfigKeys() {
"server.nearline.workload.producer.throughput.optimization.enabled";

public static final String SERVER_ZSTD_DICT_COMPRESSION_LEVEL = "server.zstd.dict.compression.level";

public static final String SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP =
"server.delete.unassigned.partitions.on.startup";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.linkedin.venice.ConfigKeys.PUB_SUB_CONSUMER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_PRODUCER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class VeniceServerWrapper extends ProcessWrapper implements MetricsAware
public static final String SERVER_ENABLE_SSL = "server_enable_ssl";
public static final String SERVER_SSL_TO_KAFKA = "server_ssl_to_kafka";
public static final String CLIENT_CONFIG_FOR_CONSUMER = "client_config_for_consumer";
public static final String SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START =
"server_delete_unassigned_partitions_on_start";

private TestVeniceServer veniceServer;
private final VeniceProperties serverProps;
Expand Down Expand Up @@ -197,6 +200,8 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
Map<String, Map<String, String>> kafkaClusterMap,
String serverD2ServiceName) {
return (serviceName, dataDirectory) -> {
boolean serverDeleteUnassignedPartitionsOnStartup =
Boolean.parseBoolean(featureProperties.getProperty(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START, "false"));
boolean enableServerAllowlist =
Boolean.parseBoolean(featureProperties.getProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, "false"));
boolean sslToKafka = Boolean.parseBoolean(featureProperties.getProperty(SERVER_SSL_TO_KAFKA, "false"));
Expand Down Expand Up @@ -257,7 +262,8 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName())
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000)
.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000)
.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true);
.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true)
.put(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP, serverDeleteUnassignedPartitionsOnStartup);
if (sslToKafka) {
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name());
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.server;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SERVER_ALLOW_LIST;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_IS_AUTO_JOIN;

Expand Down Expand Up @@ -203,6 +204,7 @@ public void testStartServerAndShutdownWithPartitionAssignmentVerification() {
Properties featureProperties = new Properties();
featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true));
featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true));
featureProperties.setProperty(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_START, Boolean.toString(true));
cluster.addVeniceServer(featureProperties, new Properties());
VeniceServerWrapper server = cluster.getVeniceServers().get(0);
Assert.assertTrue(server.getVeniceServer().isStarted());
Expand All @@ -218,6 +220,7 @@ public void testStartServerAndShutdownWithPartitionAssignmentVerification() {
Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 3);

cluster.stopVeniceServer(server.getPort());
Assert.assertFalse(server.getVeniceServer().isStarted());

// Create new servers so partition assignment is removed for the offline participant
cluster.addVeniceServer(featureProperties, new Properties());
Expand Down

0 comments on commit 54ba087

Please sign in to comment.