Skip to content

Commit 6022835

Browse files
garyrussellartembilan
authored andcommitted
GH-2197: Fix Container State After Fatal Stop
Resolves #2197 When a container is stopped due to any fatal error, `isInExpectedState()` should return false. **cherry-pick to 2.8.x, 2.7.x**
1 parent 1c98623 commit 6022835

File tree

3 files changed

+57
-25
lines changed

3 files changed

+57
-25
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
165165
private String clientIdSuffix;
166166

167167
private Runnable emergencyStop = () -> stopAbnormally(() -> {
168-
// NOSONAR
169168
});
170169

171170
private volatile ListenerConsumer listenerConsumer;
@@ -222,7 +221,7 @@ public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consu
222221
}
223222

224223
/**
225-
* Set a {@link Runnable} to call whenever an {@link Error} occurs on a listener
224+
* Set a {@link Runnable} to call whenever a fatal error occurs on the listener
226225
* thread.
227226
* @param emergencyStop the Runnable.
228227
* @since 2.2.1
@@ -1275,11 +1274,8 @@ public void run() {
12751274
exitThrowable = e;
12761275
}
12771276
catch (Error e) { // NOSONAR - rethrown
1278-
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
1279-
if (runnable != null) {
1280-
runnable.run();
1281-
}
12821277
this.logger.error(e, "Stopping container due to an Error");
1278+
this.fatalError = true;
12831279
wrapUp(e);
12841280
throw e;
12851281
}
@@ -1740,8 +1736,10 @@ private void wrapUp(@Nullable Throwable throwable) {
17401736
}
17411737
}
17421738
else {
1743-
this.logger.error("Fatal consumer exception; stopping container");
1744-
KafkaMessageListenerContainer.this.stop(false);
1739+
if (!(throwable instanceof Error)) {
1740+
this.logger.error("Fatal consumer exception; stopping container");
1741+
}
1742+
KafkaMessageListenerContainer.this.emergencyStop.run();
17451743
}
17461744
this.monitorTask.cancel(true);
17471745
if (!this.taskSchedulerExplicitlySet) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+49-16
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.springframework.kafka.event.ConsumerStoppedEvent;
100100
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
101101
import org.springframework.kafka.event.ConsumerStoppingEvent;
102+
import org.springframework.kafka.event.ContainerStoppedEvent;
102103
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
103104
import org.springframework.kafka.listener.ContainerProperties.AckMode;
104105
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
@@ -3068,38 +3069,63 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
30683069
container.stop();
30693070
}
30703071

3071-
@SuppressWarnings({ "unchecked", "rawtypes" })
30723072
@Test
3073-
void testFatalErrorOnAuthenticationException() throws Exception {
3073+
void testFatalErrorOnAuthenticationException() throws InterruptedException {
30743074
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3075-
Consumer<Integer, String> consumer = mock(Consumer.class);
3076-
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3077-
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
3078-
3079-
willThrow(AuthenticationException.class)
3080-
.given(consumer).poll(any());
3081-
30823075
ContainerProperties containerProps = new ContainerProperties(topic1);
30833076
containerProps.setGroupId("grp");
30843077
containerProps.setClientId("clientId");
30853078
containerProps.setMessageListener((MessageListener) r -> { });
30863079
KafkaMessageListenerContainer<Integer, String> container =
30873080
new KafkaMessageListenerContainer<>(cf, containerProps);
3081+
testFatalErrorOnAuthenticationException(container, cf);
3082+
}
3083+
3084+
@Test
3085+
void testFatalErrorOnAuthenticationExceptionConcurrent() throws InterruptedException {
3086+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3087+
ContainerProperties containerProps = new ContainerProperties(topic1);
3088+
containerProps.setGroupId("grp");
3089+
containerProps.setClientId("clientId");
3090+
containerProps.setMessageListener((MessageListener) r -> { });
3091+
ConcurrentMessageListenerContainer<Integer, String> container =
3092+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
3093+
testFatalErrorOnAuthenticationException(container, cf);
3094+
}
3095+
3096+
@SuppressWarnings({ "unchecked", "rawtypes" })
3097+
private void testFatalErrorOnAuthenticationException(AbstractMessageListenerContainer container,
3098+
ConsumerFactory<Integer, String> cf) throws InterruptedException {
3099+
3100+
Consumer<Integer, String> consumer = mock(Consumer.class);
3101+
given(cf.createConsumer(eq("grp"), eq("clientId"),
3102+
container instanceof ConcurrentMessageListenerContainer ? eq("-0") : isNull(), any()))
3103+
.willReturn(consumer);
3104+
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
3105+
3106+
willThrow(AuthenticationException.class)
3107+
.given(consumer).poll(any());
30883108

30893109
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
3090-
CountDownLatch stopped = new CountDownLatch(1);
3110+
CountDownLatch consumerStopped = new CountDownLatch(1);
3111+
CountDownLatch containerStopped = new CountDownLatch(1);
30913112

30923113
container.setApplicationEventPublisher(e -> {
30933114
if (e instanceof ConsumerStoppedEvent) {
30943115
reason.set(((ConsumerStoppedEvent) e).getReason());
3095-
stopped.countDown();
3116+
consumerStopped.countDown();
3117+
}
3118+
else if (e instanceof ContainerStoppedEvent) {
3119+
containerStopped.countDown();
30963120
}
30973121
});
30983122

30993123
container.start();
31003124
try {
3101-
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3125+
assertThat(consumerStopped.await(10, TimeUnit.SECONDS)).isTrue();
31023126
assertThat(reason.get()).isEqualTo(Reason.AUTH);
3127+
assertThat(containerStopped.await(10, TimeUnit.SECONDS)).isTrue();
3128+
assertThat(container.isInExpectedState()).isFalse();
31033129
}
31043130
finally {
31053131
container.stop();
@@ -3125,18 +3151,23 @@ void testFatalErrorOnAuthorizationException() throws Exception {
31253151
new KafkaMessageListenerContainer<>(cf, containerProps);
31263152

31273153
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
3128-
CountDownLatch stopped = new CountDownLatch(1);
3154+
CountDownLatch consumerStopped = new CountDownLatch(1);
3155+
CountDownLatch containerStopped = new CountDownLatch(1);
31293156

31303157
container.setApplicationEventPublisher(e -> {
31313158
if (e instanceof ConsumerStoppedEvent) {
31323159
reason.set(((ConsumerStoppedEvent) e).getReason());
3133-
stopped.countDown();
3160+
consumerStopped.countDown();
3161+
}
3162+
else if (e instanceof ContainerStoppedEvent) {
3163+
containerStopped.countDown();
31343164
}
31353165
});
31363166

31373167
container.start();
3138-
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3168+
assertThat(consumerStopped.await(10, TimeUnit.SECONDS)).isTrue();
31393169
assertThat(reason.get()).isEqualTo(Reason.AUTH);
3170+
assertThat(container.isInExpectedState()).isFalse();
31403171
container.stop();
31413172
}
31423173

@@ -3163,6 +3194,7 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
31633194
container.start();
31643195
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
31653196
container.stop();
3197+
assertThat(container.isInExpectedState()).isTrue();
31663198
}
31673199

31683200
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -3186,13 +3218,14 @@ void testFatalErrorOnFencedInstanceException() throws Exception {
31863218
CountDownLatch stopped = new CountDownLatch(1);
31873219

31883220
container.setApplicationEventPublisher(e -> {
3189-
if (e instanceof ConsumerStoppedEvent) {
3221+
if (e instanceof ContainerStoppedEvent) {
31903222
stopped.countDown();
31913223
}
31923224
});
31933225

31943226
container.start();
31953227
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3228+
assertThat(container.isInExpectedState()).isFalse();
31963229
container.stop();
31973230
}
31983231

spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -127,6 +127,7 @@ public void testOOMKMLC() throws Exception {
127127
container.start();
128128
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
129129
assertThat(container.isRunning()).isFalse();
130+
assertThat(container.isInExpectedState()).isFalse();
130131
}
131132

132133
}

0 commit comments

Comments
 (0)