Skip to content

Commit f06ef14

Browse files
committed
Addressing PR review
1 parent ec59f25 commit f06ef14

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.commons.logging.LogFactory;
2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
24-
import org.jspecify.annotations.NonNull;
2524
import org.jspecify.annotations.Nullable;
2625

2726
import org.springframework.beans.BeanUtils;
@@ -48,6 +47,7 @@
4847
* @param <V> the value type
4948
*
5049
* @author Soby Chacko
50+
* @since 4.0
5151
*/
5252
public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
5353
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
@@ -58,7 +58,6 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
5858
*/
5959
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100;
6060

61-
@NonNull
6261
protected final ShareConsumerFactory<K, V> shareConsumerFactory;
6362

6463
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
@@ -67,7 +66,6 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
6766

6867
protected final ReentrantLock lifecycleLock = new ReentrantLock();
6968

70-
@NonNull
7169
private String beanName = "noBeanNameSet";
7270

7371
@Nullable

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Arrays;
1920
import java.util.Collections;
2021
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
@@ -49,10 +50,13 @@
4950
* @param <V> the value type
5051
*
5152
* @author Soby Chacko
53+
* @since 4.0
5254
*/
5355
public class ShareKafkaMessageListenerContainer<K, V>
5456
extends AbstractShareKafkaMessageListenerContainer<K, V> {
5557

58+
private static final int POLL_TIMEOUT = 1000;
59+
5660
@Nullable
5761
private String clientId;
5862

@@ -166,11 +170,12 @@ private class ShareListenerConsumer implements Runnable {
166170
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
167171
ShareKafkaMessageListenerContainer.this.getGroupId(),
168172
ShareKafkaMessageListenerContainer.this.getClientId());
173+
169174
this.genericListener = listener;
170175
this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
171176
// Subscribe to topics, just like in the test
172177
ContainerProperties containerProperties = getContainerProperties();
173-
this.consumer.subscribe(java.util.Arrays.asList(containerProperties.getTopics()));
178+
this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
174179
}
175180

176181
@Nullable
@@ -184,7 +189,7 @@ public void run() {
184189
Throwable exitThrowable = null;
185190
while (isRunning()) {
186191
try {
187-
var records = this.consumer.poll(java.time.Duration.ofMillis(1000));
192+
var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT));
188193
if (records != null && records.count() > 0) {
189194
for (var record : records) {
190195
@SuppressWarnings("unchecked")
@@ -199,6 +204,7 @@ public void run() {
199204
}
200205
catch (Error e) {
201206
this.logger.error(e, "Stopping share consumer due to an Error");
207+
wrapUp();
202208
throw e;
203209
}
204210
catch (Exception e) {
@@ -213,15 +219,19 @@ public void run() {
213219
if (exitThrowable != null) {
214220
this.logger.error(exitThrowable, "ShareListenerConsumer exiting due to error");
215221
}
216-
this.consumer.close();
217-
this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
222+
wrapUp();
218223
}
219224

220225
protected void initialize() {
221226
publishConsumerStartingEvent();
222227
publishConsumerStartedEvent();
223228
}
224229

230+
private void wrapUp(){
231+
this.consumer.close();
232+
this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
233+
}
234+
225235
@Override
226236
public String toString() {
227237
return "ShareKafkaMessageListenerContainer.ShareListenerConsumer ["

0 commit comments

Comments
 (0)