diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index b47b386d2..0a8ae570d 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import io.lettuce.core.ClientOptions; @@ -64,6 +66,10 @@ class ClusterTopologyRefreshScheduler implements Runnable, ClusterEventListener private final EventExecutorGroup genericWorkerPool; + private static final ReentrantLock refreshLock = new ReentrantLock(); + + private static final Condition refreshComplete = refreshLock.newCondition(); + ClusterTopologyRefreshScheduler(Supplier clientOptions, Supplier partitions, Supplier> refreshTopology, ClientResources clientResources) { @@ -112,6 +118,14 @@ public boolean isTopologyRefreshInProgress() { return clusterTopologyRefreshTask.get(); } + public ReentrantLock getRefreshLock() { + return refreshLock; + } + + public Condition getRefreshComplete() { + return refreshComplete; + } + @Override public void run() { @@ -345,7 +359,15 @@ void doRun() { logger.warn("Cannot refresh Redis Cluster topology", throwable); } - set(false); + refreshLock.lock(); + try { + reloadTopologyAsync.get(); + + set(false); + refreshComplete.signalAll(); + } finally { + refreshLock.unlock(); + } }); } catch (Exception e) { logger.warn("Cannot refresh Redis Cluster topology", e); diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 31125d93c..c3d5bc8d8 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -33,6 +33,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -1152,8 +1154,13 @@ public void setPartitions(Partitions partitions) { public CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) { suspendTopologyRefresh(); - - return super.shutdownAsync(quietPeriod, timeout, timeUnit); + ReentrantLock refreshLock = topologyRefreshScheduler.getRefreshLock(); + refreshLock.lock(); + try { + return super.shutdownAsync(quietPeriod, timeout, timeUnit); + } finally { + refreshLock.unlock(); + } } // -------------------------------------------------------------------------