Skip to content

Commit 91d249d

Browse files
authored
[server] Restore data partition in isolated process in Venice Server (#810)
This PR tries to address the issue of cleaning up data correctly for ingestion isolation enabled server. Originally, we observed that certain data partition is not deleted when a OFFLINE -> DROPPED message is issued for stale version upon server restart. The message is not executed as we did not restore all the data partitions upon server start in any of the processes. The first fix we implemented is in #617 , where we added a clean up service in isolated process. However, it is not correct, as the clean up service tries to remove partitions opened in the forked process, which is metadata partition only. This leaves the data partitions still unopened and lingering forever. This PR tries to fix it thoroughly by adjusting the data restore logic for II. It will by default restore everything in II process. For those partitions to receive OFFLINE->DROPPED message, it will be sent to isolated process and get executed. For old storage engines, it shall also be opened and should be cleaned up eventually, as restore metadata partition will recreate the metadata partition, even if it is deleted before.
1 parent 44ff335 commit 91d249d

File tree

3 files changed

+30
-19
lines changed

3 files changed

+30
-19
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,15 +390,17 @@ void executeCommandWithRetry(
390390
Runnable mainProcessCommandRunnable) {
391391
boolean isTopicPartitionHosted = isTopicPartitionHosted(topicName, partition);
392392

393-
// drop storage partition in main process if it does not exist
394-
if (command == REMOVE_PARTITION && !isTopicPartitionHosted) {
395-
mainProcessCommandRunnable.run();
396-
return;
397-
}
398-
399393
do {
394+
/**
395+
* There are two cases where we execute the command to the main process:
396+
* (1) The main process metadata tells that the topic partition is hosted in main process.
397+
* (2) The topic partition has never been associated with the server, and the incoming command is not START_CONSUMPTION
398+
* or REMOVE_PARTITION. For these two commands, it should by default be sent to isolated process. Here it needs to
399+
* make sure topic partition subscription request has never been issued to the server, so the check "isTopicPartitionHosted"
400+
* should be false in this case.
401+
*/
400402
if (isTopicPartitionHostedInMainProcess(topicName, partition)
401-
|| !isTopicPartitionHosted && command != START_CONSUMPTION) {
403+
|| (!isTopicPartitionHosted && command != START_CONSUMPTION && command != REMOVE_PARTITION)) {
402404
LOGGER.info("Executing command {} of topic: {}, partition: {} in main process.", command, topicName, partition);
403405
mainProcessCommandRunnable.run();
404406
return;

clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ public boolean isResourceSubscribed(String topicName, int partition) {
472472
return subscription.get();
473473
}
474474

475-
public void maybeSubscribeNewResource(String topicName, int partition) {
475+
public void maybeInitializeResourceHostingMetadata(String topicName, int partition) {
476476
getTopicPartitionSubscriptionMap().computeIfAbsent(topicName, s -> new VeniceConcurrentHashMap<>())
477477
.computeIfAbsent(partition, p -> new AtomicBoolean(true));
478478
}
@@ -715,27 +715,30 @@ private void initializeIsolatedIngestionServer() {
715715
metricsRepository,
716716
storeRepository,
717717
serverConfig.isUnregisterMetricForDeletedStoreEnabled());
718+
boolean isDaVinciClient = veniceMetadataRepositoryBuilder.isDaVinciClient();
719+
718720
/**
719-
* The reason of not to restore the data partitions during initialization of storage service is:
720-
* 1. During first fresh start up with no data on disk, we don't need to restore anything
721-
* 2. During fresh start up with data on disk (aka bootstrap), we will receive messages to subscribe to the partition
721+
* For isolated ingestion server, we always restore metadata partition, but we will only restore data partition in server,
722+
* not Da Vinci.
723+
* The reason of not to restore the data partitions during initialization of storage service in DVC is:
724+
* 1. During fresh start up with data on disk (aka bootstrap), we will receive messages to subscribe to the partition
722725
* and it will re-open the partition on demand.
723-
* 3. During crash recovery restart, partitions that are already ingestion will be opened by parent process and we
726+
* 2. During crash recovery restart, partitions that are already ingestion will be opened by parent process and we
724727
* should not try to open it. The remaining ingestion tasks will open the storage engines.
725-
* Also, we don't need to restore metadata partition here, as all subscribe requests sent to forked process will
726-
* automatically open the metadata partitions. Also, during Da Vinci bootstrap, main process will need to open the
727-
* metadata partition of storage engines in order to perform full cleanup of stale versions.
728+
*
729+
* The reason to restore data partitions during initialization of storage service in server is:
730+
* 1. Helix can issue OFFLINE->DROPPED state transition for stale partition in anytime, including server restart time.
731+
* If data partition is not restored here (as well as the main process), it will never be dropped so the RocksDB storage
732+
* will be leaking.
728733
*/
729-
boolean isDaVinciClient = veniceMetadataRepositoryBuilder.isDaVinciClient();
730-
731734
storageService = new StorageService(
732735
configLoader,
733736
storageEngineStats,
734737
rocksDBMemoryStats,
735738
storeVersionStateSerializer,
736739
partitionStateSerializer,
737740
storeRepository,
738-
false,
741+
!isDaVinciClient,
739742
true);
740743
storageService.start();
741744

clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServerHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private IngestionTaskReport handleIngestionTaskCommand(IngestionTaskCommand inge
162162
storeConfig.setRestoreDataPartitions(false);
163163
switch (ingestionCommandType) {
164164
case START_CONSUMPTION:
165-
isolatedIngestionServer.maybeSubscribeNewResource(topicName, partitionId);
165+
isolatedIngestionServer.maybeInitializeResourceHostingMetadata(topicName, partitionId);
166166
validateAndExecuteCommand(ingestionCommandType, report, () -> {
167167
ReadOnlyStoreRepository storeRepository = isolatedIngestionServer.getStoreRepository();
168168
// For subscription based store repository, we will need to subscribe to the store explicitly.
@@ -201,6 +201,12 @@ private IngestionTaskReport handleIngestionTaskCommand(IngestionTaskCommand inge
201201
isolatedIngestionServer.cleanupTopicState(topicName);
202202
break;
203203
case REMOVE_PARTITION:
204+
/**
205+
* For this command, we will try to initialize the isolated process metadata for the topic partition, if the
206+
* topic partition has never been associated with the server. This is needed as data partition is by default
207+
* restored in the isolated process, and thus it should be able to be removed by Helix command.
208+
*/
209+
isolatedIngestionServer.maybeInitializeResourceHostingMetadata(topicName, partitionId);
204210
validateAndExecuteCommand(ingestionCommandType, report, () -> {
205211
/**
206212
* Here we do not allow storage service to clean up "empty" storage engine. When ingestion isolation is turned on,

0 commit comments

Comments
 (0)