diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index f1ca924b7..3fcafbc67 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -22,11 +22,12 @@ import static io.lettuce.core.event.cluster.AdaptiveRefreshTriggeredEvent.*; import java.time.Duration; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import io.lettuce.core.ClientOptions; @@ -65,6 +66,10 @@ class ClusterTopologyRefreshScheduler implements Runnable, ClusterEventListener private final EventExecutorGroup genericWorkerPool; + private static final ReentrantLock refreshLock = new ReentrantLock(); + + private static final Condition refreshComplete = refreshLock.newCondition(); + ClusterTopologyRefreshScheduler(Supplier clientOptions, Supplier partitions, Supplier> refreshTopology, ClientResources clientResources) { @@ -94,33 +99,33 @@ protected void activateTopologyRefreshIfNeeded() { /** * Suspend (cancel) periodic topology refresh. */ - public CompletableFuture suspendTopologyRefresh() { - CompletableFuture completionFuture = new CompletableFuture<>(); + public void suspendTopologyRefresh() { if (clusterTopologyRefreshActivated.compareAndSet(true, false)) { + ScheduledFuture scheduledFuture = clusterTopologyRefreshFuture.get(); try { - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - clusterTopologyRefreshFuture.set(null); - } - completionFuture.complete(null); + scheduledFuture.cancel(false); + clusterTopologyRefreshFuture.set(null); } catch (Exception e) { logger.debug("Could not cancel Cluster topology refresh", e); - completionFuture.completeExceptionally(e); } - } else { - completionFuture.complete(null); } - - return completionFuture; } public boolean isTopologyRefreshInProgress() { return clusterTopologyRefreshTask.get(); } + public ReentrantLock getRefreshLock() { + return refreshLock; + } + + public Condition getRefreshComplete() { + return refreshComplete; + } + @Override public void run() { @@ -332,13 +337,18 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements public void run() { - if (compareAndSet(false, true)) { - doRun(); - return; - } + refreshLock.lock(); + try { + if (compareAndSet(false, true)) { + doRun(); + return; + } - if (logger.isDebugEnabled()) { - logger.debug("ClusterTopologyRefreshTask already in progress"); + if (logger.isDebugEnabled()) { + logger.debug("ClusterTopologyRefreshTask already in progress"); + } + } finally { + refreshLock.unlock(); } } @@ -354,7 +364,13 @@ void doRun() { logger.warn("Cannot refresh Redis Cluster topology", throwable); } - set(false); + refreshLock.lock(); + try { + set(false); + refreshComplete.signalAll(); + } finally { + refreshLock.unlock(); + } }); } catch (Exception e) { logger.warn("Cannot refresh Redis Cluster topology", e); diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index e047717bc..02c6533f8 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -33,6 +33,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -968,8 +970,8 @@ public CompletionStage refreshPartitionsAsync() { * * @since 6.3 */ - public CompletableFuture suspendTopologyRefresh() { - return topologyRefreshScheduler.suspendTopologyRefresh(); + public void suspendTopologyRefresh() { + topologyRefreshScheduler.suspendTopologyRefresh(); } /** @@ -1151,7 +1153,24 @@ public void setPartitions(Partitions partitions) { @Override public CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) { - return suspendTopologyRefresh().thenCompose(voidResult -> super.shutdownAsync(quietPeriod, timeout, timeUnit)); + suspendTopologyRefresh(); + ReentrantLock refreshLock = topologyRefreshScheduler.getRefreshLock(); + Condition refreshComplete = topologyRefreshScheduler.getRefreshComplete(); + + refreshLock.lock(); + try { + while (topologyRefreshScheduler.isTopologyRefreshInProgress()) { + try { + refreshComplete.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } finally { + refreshLock.unlock(); + } + + return super.shutdownAsync(quietPeriod, timeout, timeUnit); } // -------------------------------------------------------------------------