Skip to content

Commit

Permalink
[server][dvc] Increase graceful shutdown time for Da Vinci to reduce …
Browse files Browse the repository at this point in the history
…timeout exception (#1331)

For Da Vinci, when we close / retire a version, it will call shutdownIngestionTask API, which will try to unsubscribe and sync offset for graceful shutdown. Today the timeout is 10 seconds and it could timeout for topic with many partition assignment.
This will result in two kinds of exceptions:

When it is doing graceful shutdown and syncing offset, it may fail after the wait timeout as it will also remove storage engine
It may also failed to persist certain lingering data in drainer after timeout.
This PR tries to do minimal changes to reduce the noises in application that runs DVC by increasing the total timeout for per SIT shutdown wait.
  • Loading branch information
sixpluszero authored Dec 6, 2024
1 parent 03251d7 commit 6bbf149
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,9 +677,9 @@ public void startConsumption(VeniceStoreVersionConfig veniceStore, int partition
*/
public void shutdownStoreIngestionTask(String topicName) {
try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topicName)) {
if (topicNameToIngestionTaskMap.containsKey(topicName)) {
StoreIngestionTask storeIngestionTask = topicNameToIngestionTaskMap.remove(topicName);
storeIngestionTask.shutdown(10000);
StoreIngestionTask storeIngestionTask = topicNameToIngestionTaskMap.remove(topicName);
if (storeIngestionTask != null) {
storeIngestionTask.shutdownAndWait(30);
LOGGER.info("Successfully shut down ingestion task for {}", topicName);
} else {
LOGGER.info("Ignoring close request for not-existing consumption task {}", topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
Expand Down Expand Up @@ -347,6 +348,8 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected final ExecutorService parallelProcessingThreadPool;

protected final CountDownLatch gracefulShutdownLatch = new CountDownLatch(1);

public StoreIngestionTask(
StorageService storageService,
StoreIngestionTaskFactory.Builder builder,
Expand Down Expand Up @@ -1648,6 +1651,8 @@ public void run() {
*/
CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS);
}
// Release the latch after all the shutdown completes in DVC/Server.
getGracefulShutdownLatch().countDown();
} catch (VeniceIngestionTaskKilledException e) {
LOGGER.info("{} has been killed.", ingestionTaskName);
ingestionNotificationDispatcher.reportKilled(partitionConsumptionStateMap.values(), e);
Expand Down Expand Up @@ -3967,18 +3972,24 @@ public synchronized void close() {
* This method is a blocking call to wait for {@link StoreIngestionTask} for fully shutdown in the given time.
* @param waitTime Maximum wait time for the shutdown operation.
*/
public synchronized void shutdown(int waitTime) {
public void shutdownAndWait(int waitTime) {
long startTimeInMs = System.currentTimeMillis();
close();
try {
wait(waitTime);
if (!getGracefulShutdownLatch().await(waitTime, SECONDS)) {
LOGGER.warn(
"Unable to shutdown ingestion task of topic: {} gracefully in {}ms",
kafkaVersionTopic,
SECONDS.toMillis(waitTime));
} else {
LOGGER.info(
"Ingestion task of topic: {} is shutdown in {}ms",
kafkaVersionTopic,
LatencyUtils.getElapsedTimeFromMsToMs(startTimeInMs));
}
} catch (Exception e) {
LOGGER.error("Caught exception while waiting for ingestion task of topic: {} shutdown.", kafkaVersionTopic);
}
LOGGER.info(
"Ingestion task of topic: {} is shutdown in {}ms",
kafkaVersionTopic,
LatencyUtils.getElapsedTimeFromMsToMs(startTimeInMs));
}

/**
Expand Down Expand Up @@ -4445,6 +4456,10 @@ public boolean hasAllPartitionReportedCompleted() {
return true;
}

CountDownLatch getGracefulShutdownLatch() {
return gracefulShutdownLatch;
}

// For unit test purpose.
void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
this.versionRole = versionRole;
Expand Down

0 comments on commit 6bbf149

Please sign in to comment.