Skip to content

Commit 2e29a99

Browse files
mp911dechristophstrobl
authored andcommitted
Differentiate between initial exception handling, recovery and recovery after subscription.
We now differentiate exception handling regarding the recovery state. Initial listen fails if the connection is unavailable. Upon recovery after a preceeding subscription we now log the success to create a counterpart to our error logging. Closes: #2782 Original Pull Request: #2808
1 parent da44aa2 commit 2e29a99

File tree

2 files changed

+147
-15
lines changed

2 files changed

+147
-15
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

+77-15
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ private void lazyListen() {
370370
State state = this.state.get();
371371

372372
CompletableFuture<Void> futureToAwait = state.isPrepareListening() ? containerListenFuture
373-
: lazyListen(this.backOff.start());
373+
: lazyListen(new InitialBackoffExecution(this.backOff.start()));
374374

375375
try {
376376
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
@@ -531,8 +531,7 @@ private void awaitRegistrationTime(CompletableFuture<Void> future) {
531531
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
532532
} catch (InterruptedException ex) {
533533
Thread.currentThread().interrupt();
534-
} catch (ExecutionException | TimeoutException ignore) {
535-
}
534+
} catch (ExecutionException | TimeoutException ignore) {}
536535
}
537536

538537
@Override
@@ -876,7 +875,7 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
876875

877876
if (recoveryInterval != BackOffExecution.STOP) {
878877
String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms",
879-
cause, recoveryInterval);
878+
cause, recoveryInterval);
880879
logger.error(message, cause);
881880
}
882881

@@ -885,8 +884,13 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
885884

886885
Runnable recoveryFunction = () -> {
887886

888-
CompletableFuture<Void> lazyListen = lazyListen(backOffExecution);
889-
lazyListen.whenComplete(propagate(future));
887+
CompletableFuture<Void> lazyListen = lazyListen(new RecoveryBackoffExecution(backOffExecution));
888+
lazyListen.whenComplete(propagate(future)).thenRun(() -> {
889+
890+
if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution) {
891+
logger.info("Subscription(s) recovered");
892+
}
893+
});
890894
};
891895

892896
if (potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
@@ -980,7 +984,7 @@ private boolean hasTopics() {
980984
private Subscriber getRequiredSubscriber() {
981985

982986
Assert.state(this.subscriber != null,
983-
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
987+
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called");
984988

985989
return this.subscriber;
986990
}
@@ -1018,6 +1022,54 @@ private void logTrace(Supplier<String> message) {
10181022
}
10191023
}
10201024

1025+
BackOffExecution nextBackoffExecution(BackOffExecution backOffExecution, boolean subscribed) {
1026+
1027+
if (subscribed) {
1028+
return new RecoveryAfterSubscriptionBackoffExecution(backOff.start());
1029+
}
1030+
1031+
return backOffExecution;
1032+
}
1033+
1034+
/**
1035+
* Marker for an initial backoff.
1036+
*
1037+
* @param delegate
1038+
*/
1039+
record InitialBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
1040+
1041+
@Override
1042+
public long nextBackOff() {
1043+
return delegate.nextBackOff();
1044+
}
1045+
}
1046+
1047+
/**
1048+
* Marker for a recovery after a subscription has been active previously.
1049+
*
1050+
* @param delegate
1051+
*/
1052+
record RecoveryAfterSubscriptionBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
1053+
1054+
@Override
1055+
public long nextBackOff() {
1056+
return delegate.nextBackOff();
1057+
}
1058+
}
1059+
1060+
/**
1061+
* Marker for a recovery execution.
1062+
*
1063+
* @param delegate
1064+
*/
1065+
record RecoveryBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
1066+
1067+
@Override
1068+
public long nextBackOff() {
1069+
return delegate.nextBackOff();
1070+
}
1071+
}
1072+
10211073
/**
10221074
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
10231075
* {@code channel or pattern}, and {@code count} and returns no result.
@@ -1191,18 +1243,23 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
11911243
if (connection.isSubscribed()) {
11921244

11931245
initFuture.completeExceptionally(
1194-
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
1246+
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
11951247

11961248
return initFuture;
11971249
}
11981250

11991251
try {
12001252
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
12011253
} catch (Throwable t) {
1202-
handleSubscriptionException(initFuture, backOffExecution, t);
1254+
handleSubscriptionException(initFuture, nextBackoffExecution(backOffExecution, connection.isSubscribed()),
1255+
t);
12031256
}
12041257
} catch (RuntimeException ex) {
1205-
initFuture.completeExceptionally(ex);
1258+
if (backOffExecution instanceof InitialBackoffExecution) {
1259+
initFuture.completeExceptionally(ex);
1260+
} else {
1261+
handleSubscriptionException(initFuture, backOffExecution, ex);
1262+
}
12061263
}
12071264

12081265
return initFuture;
@@ -1215,8 +1272,9 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
12151272
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
12161273
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
12171274

1218-
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
1219-
() -> subscriptionDone.complete(null)));
1275+
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> {
1276+
subscriptionDone.complete(null);
1277+
}));
12201278

12211279
doSubscribe(connection, patterns, channels);
12221280
}
@@ -1381,7 +1439,10 @@ private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]
13811439
}
13821440

13831441
private void doInLock(Runnable runner) {
1384-
doInLock(() -> { runner.run(); return null; });
1442+
doInLock(() -> {
1443+
runner.run();
1444+
return null;
1445+
});
13851446
}
13861447

13871448
private <T> T doInLock(Supplier<T> supplier) {
@@ -1432,7 +1493,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14321493
try {
14331494
subscribeChannel(channels.toArray(new byte[0][]));
14341495
} catch (Exception ex) {
1435-
handleSubscriptionException(subscriptionDone, backOffExecution, ex);
1496+
handleSubscriptionException(subscriptionDone, nextBackoffExecution(backOffExecution, true), ex);
14361497
}
14371498
}));
14381499
} else {
@@ -1449,7 +1510,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14491510
closeConnection();
14501511
unsubscribeFuture.complete(null);
14511512
} catch (Throwable cause) {
1452-
handleSubscriptionException(subscriptionDone, backOffExecution, cause);
1513+
handleSubscriptionException(subscriptionDone,
1514+
nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause);
14531515
}
14541516
});
14551517
}

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

+70
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.nio.charset.StandardCharsets;
22+
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.Executor;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
2326

2427
import org.junit.jupiter.api.BeforeEach;
2528
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2631
import org.springframework.core.task.SyncTaskExecutor;
2732
import org.springframework.data.redis.RedisConnectionFailureException;
2833
import org.springframework.data.redis.connection.RedisConnection;
@@ -32,6 +37,7 @@
3237
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
3338
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
3439
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
40+
import org.springframework.util.backoff.FixedBackOff;
3541

3642
/**
3743
* Unit tests for {@link RedisMessageListenerContainer}.
@@ -147,6 +153,70 @@ void containerListenShouldReportFailureOnRedisUnavailability() {
147153
assertThat(container.isListening()).isFalse();
148154
}
149155

156+
@Test // GH-2335
157+
void shouldRecoverFromConnectionFailure() throws Exception {
158+
159+
AtomicInteger requestCount = new AtomicInteger();
160+
AtomicBoolean shouldThrowSubscriptionException = new AtomicBoolean();
161+
162+
container = new RedisMessageListenerContainer();
163+
container.setConnectionFactory(connectionFactoryMock);
164+
container.setBeanName("container");
165+
container.setTaskExecutor(new SyncTaskExecutor());
166+
container.setSubscriptionExecutor(new SimpleAsyncTaskExecutor());
167+
container.setMaxSubscriptionRegistrationWaitingTime(1000);
168+
container.setRecoveryBackoff(new FixedBackOff(1, 5));
169+
container.afterPropertiesSet();
170+
171+
doAnswer(it -> {
172+
173+
int req = requestCount.incrementAndGet();
174+
if (req == 1 || req == 3) {
175+
return connectionMock;
176+
}
177+
178+
throw new RedisConnectionFailureException("Booh");
179+
}).when(connectionFactoryMock).getConnection();
180+
181+
CountDownLatch exceptionWait = new CountDownLatch(1);
182+
CountDownLatch armed = new CountDownLatch(1);
183+
CountDownLatch recoveryArmed = new CountDownLatch(1);
184+
185+
doAnswer(it -> {
186+
187+
SubscriptionListener listener = it.getArgument(0);
188+
when(connectionMock.isSubscribed()).thenReturn(true);
189+
190+
listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);
191+
192+
armed.countDown();
193+
exceptionWait.await();
194+
195+
if (shouldThrowSubscriptionException.compareAndSet(true, false)) {
196+
when(connectionMock.isSubscribed()).thenReturn(false);
197+
throw new RedisConnectionFailureException("Disconnected");
198+
}
199+
200+
recoveryArmed.countDown();
201+
202+
return null;
203+
}).when(connectionMock).subscribe(any(), any());
204+
205+
container.start();
206+
container.addMessageListener(new MessageListenerAdapter(handler), new ChannelTopic("a"));
207+
armed.await();
208+
209+
// let an exception happen
210+
shouldThrowSubscriptionException.set(true);
211+
exceptionWait.countDown();
212+
213+
// wait for subscription recovery
214+
recoveryArmed.await();
215+
216+
assertThat(recoveryArmed.getCount()).isZero();
217+
218+
}
219+
150220
@Test // GH-964
151221
void failsOnDuplicateInit() {
152222
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());

0 commit comments

Comments
 (0)