Skip to content

Commit 370a172

Browse files
committed
Implement lock mechanism
1 parent 2f38c3c commit 370a172

File tree

2 files changed

+58
-23
lines changed

2 files changed

+58
-23
lines changed

src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import static io.lettuce.core.event.cluster.AdaptiveRefreshTriggeredEvent.*;
2323

2424
import java.time.Duration;
25-
import java.util.concurrent.CompletableFuture;
2625
import java.util.concurrent.CompletionStage;
2726
import java.util.concurrent.TimeUnit;
2827
import java.util.concurrent.atomic.AtomicBoolean;
2928
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.concurrent.locks.Condition;
30+
import java.util.concurrent.locks.ReentrantLock;
3031
import java.util.function.Supplier;
3132

3233
import io.lettuce.core.ClientOptions;
@@ -65,6 +66,10 @@ class ClusterTopologyRefreshScheduler implements Runnable, ClusterEventListener
6566

6667
private final EventExecutorGroup genericWorkerPool;
6768

69+
private static final ReentrantLock refreshLock = new ReentrantLock();
70+
71+
private static final Condition refreshComplete = refreshLock.newCondition();
72+
6873
ClusterTopologyRefreshScheduler(Supplier<ClusterClientOptions> clientOptions, Supplier<Partitions> partitions,
6974
Supplier<CompletionStage<?>> refreshTopology, ClientResources clientResources) {
7075

@@ -94,33 +99,33 @@ protected void activateTopologyRefreshIfNeeded() {
9499
/**
95100
* Suspend (cancel) periodic topology refresh.
96101
*/
97-
public CompletableFuture<Void> suspendTopologyRefresh() {
98-
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
102+
public void suspendTopologyRefresh() {
99103

100104
if (clusterTopologyRefreshActivated.compareAndSet(true, false)) {
105+
101106
ScheduledFuture<?> scheduledFuture = clusterTopologyRefreshFuture.get();
102107

103108
try {
104-
if (scheduledFuture != null) {
105-
scheduledFuture.cancel(false);
106-
clusterTopologyRefreshFuture.set(null);
107-
}
108-
completionFuture.complete(null);
109+
scheduledFuture.cancel(false);
110+
clusterTopologyRefreshFuture.set(null);
109111
} catch (Exception e) {
110112
logger.debug("Could not cancel Cluster topology refresh", e);
111-
completionFuture.completeExceptionally(e);
112113
}
113-
} else {
114-
completionFuture.complete(null);
115114
}
116-
117-
return completionFuture;
118115
}
119116

120117
public boolean isTopologyRefreshInProgress() {
121118
return clusterTopologyRefreshTask.get();
122119
}
123120

121+
public ReentrantLock getRefreshLock() {
122+
return refreshLock;
123+
}
124+
125+
public Condition getRefreshComplete() {
126+
return refreshComplete;
127+
}
128+
124129
@Override
125130
public void run() {
126131

@@ -332,13 +337,18 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements
332337

333338
public void run() {
334339

335-
if (compareAndSet(false, true)) {
336-
doRun();
337-
return;
338-
}
340+
refreshLock.lock();
341+
try {
342+
if (compareAndSet(false, true)) {
343+
doRun();
344+
return;
345+
}
339346

340-
if (logger.isDebugEnabled()) {
341-
logger.debug("ClusterTopologyRefreshTask already in progress");
347+
if (logger.isDebugEnabled()) {
348+
logger.debug("ClusterTopologyRefreshTask already in progress");
349+
}
350+
} finally {
351+
refreshLock.unlock();
342352
}
343353
}
344354

@@ -354,7 +364,13 @@ void doRun() {
354364
logger.warn("Cannot refresh Redis Cluster topology", throwable);
355365
}
356366

357-
set(false);
367+
refreshLock.lock();
368+
try {
369+
set(false);
370+
refreshComplete.signalAll();
371+
} finally {
372+
refreshLock.unlock();
373+
}
358374
});
359375
} catch (Exception e) {
360376
logger.warn("Cannot refresh Redis Cluster topology", e);

src/main/java/io/lettuce/core/cluster/RedisClusterClient.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.concurrent.CompletionStage;
3434
import java.util.concurrent.ExecutionException;
3535
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.locks.Condition;
37+
import java.util.concurrent.locks.ReentrantLock;
3638
import java.util.function.Consumer;
3739
import java.util.function.Function;
3840
import java.util.function.Predicate;
@@ -968,8 +970,8 @@ public CompletionStage<Void> refreshPartitionsAsync() {
968970
*
969971
* @since 6.3
970972
*/
971-
public CompletableFuture<Void> suspendTopologyRefresh() {
972-
return topologyRefreshScheduler.suspendTopologyRefresh();
973+
public void suspendTopologyRefresh() {
974+
topologyRefreshScheduler.suspendTopologyRefresh();
973975
}
974976

975977
/**
@@ -1151,7 +1153,24 @@ public void setPartitions(Partitions partitions) {
11511153
@Override
11521154
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {
11531155

1154-
return suspendTopologyRefresh().thenCompose(voidResult -> super.shutdownAsync(quietPeriod, timeout, timeUnit));
1156+
suspendTopologyRefresh();
1157+
ReentrantLock refreshLock = topologyRefreshScheduler.getRefreshLock();
1158+
Condition refreshComplete = topologyRefreshScheduler.getRefreshComplete();
1159+
1160+
refreshLock.lock();
1161+
try {
1162+
while (topologyRefreshScheduler.isTopologyRefreshInProgress()) {
1163+
try {
1164+
refreshComplete.await();
1165+
} catch (InterruptedException e) {
1166+
Thread.currentThread().interrupt();
1167+
}
1168+
}
1169+
} finally {
1170+
refreshLock.unlock();
1171+
}
1172+
1173+
return super.shutdownAsync(quietPeriod, timeout, timeUnit);
11551174
}
11561175

11571176
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)