Skip to content

Commit ec59f25

Browse files
committed
Add Kafka shared consumer container support
- New AbstractShareKafkaMessageListenerContainer base class with lifecycle management - ShareKafkaMessageListenerContainer implementation for share consumer protocol - Integration tests for end-to-end message delivery validation Signed-off-by: Soby Chacko <[email protected]>
1 parent 045ea9a commit ec59f25

File tree

3 files changed

+607
-0
lines changed

3 files changed

+607
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.concurrent.locks.ReentrantLock;
20+
import java.util.regex.Pattern;
21+
22+
import org.apache.commons.logging.LogFactory;
23+
import org.apache.kafka.clients.consumer.ConsumerConfig;
24+
import org.jspecify.annotations.NonNull;
25+
import org.jspecify.annotations.Nullable;
26+
27+
import org.springframework.beans.BeanUtils;
28+
import org.springframework.beans.BeansException;
29+
import org.springframework.beans.factory.BeanNameAware;
30+
import org.springframework.context.ApplicationContext;
31+
import org.springframework.context.ApplicationContextAware;
32+
import org.springframework.context.ApplicationEventPublisher;
33+
import org.springframework.context.ApplicationEventPublisherAware;
34+
import org.springframework.core.log.LogAccessor;
35+
import org.springframework.kafka.core.ShareConsumerFactory;
36+
import org.springframework.kafka.support.TopicPartitionOffset;
37+
import org.springframework.util.Assert;
38+
39+
/**
40+
* Abstract base class for share consumer message listener containers.
41+
* <p>
42+
* Handles common lifecycle, configuration, and event publishing for containers using a
43+
* {@link org.springframework.kafka.core.ShareConsumerFactory}.
44+
* <p>
45+
* Subclasses are responsible for implementing the actual consumer loop and message dispatch logic.
46+
*
47+
* @param <K> the key type
48+
* @param <V> the value type
49+
*
50+
* @author Soby Chacko
51+
*/
52+
public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
53+
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
54+
ApplicationContextAware {
55+
56+
/**
57+
* The default {@link org.springframework.context.SmartLifecycle} phase for listener containers.
58+
*/
59+
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100;
60+
61+
@NonNull
62+
protected final ShareConsumerFactory<K, V> shareConsumerFactory;
63+
64+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
65+
66+
private final ContainerProperties containerProperties;
67+
68+
protected final ReentrantLock lifecycleLock = new ReentrantLock();
69+
70+
@NonNull
71+
private String beanName = "noBeanNameSet";
72+
73+
@Nullable
74+
private ApplicationEventPublisher applicationEventPublisher;
75+
76+
private boolean autoStartup = true;
77+
78+
private int phase = DEFAULT_PHASE;
79+
80+
@Nullable
81+
private ApplicationContext applicationContext;
82+
83+
private volatile boolean running = false;
84+
85+
/**
86+
* Construct an instance with the provided factory and properties.
87+
* @param shareConsumerFactory the factory.
88+
* @param containerProperties the properties.
89+
*/
90+
@SuppressWarnings("unchecked")
91+
protected AbstractShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
92+
ContainerProperties containerProperties) {
93+
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
94+
Assert.notNull(shareConsumerFactory, "'shareConsumerFactory' cannot be null");
95+
this.shareConsumerFactory = (ShareConsumerFactory<K, V>) shareConsumerFactory;
96+
@Nullable String @Nullable [] topics = containerProperties.getTopics();
97+
if (topics != null) {
98+
this.containerProperties = new ContainerProperties(topics);
99+
}
100+
else {
101+
Pattern topicPattern = containerProperties.getTopicPattern();
102+
if (topicPattern != null) {
103+
this.containerProperties = new ContainerProperties(topicPattern);
104+
}
105+
else {
106+
@Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
107+
if (topicPartitions != null) {
108+
this.containerProperties = new ContainerProperties(topicPartitions);
109+
}
110+
else {
111+
throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided");
112+
}
113+
}
114+
}
115+
BeanUtils.copyProperties(containerProperties, this.containerProperties,
116+
"topics", "topicPartitions", "topicPattern");
117+
}
118+
119+
@Override
120+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
121+
this.applicationContext = applicationContext;
122+
}
123+
124+
@Nullable
125+
public ApplicationContext getApplicationContext() {
126+
return this.applicationContext;
127+
}
128+
129+
@Override
130+
public void setBeanName(String name) {
131+
this.beanName = name;
132+
}
133+
134+
/**
135+
* Return the bean name.
136+
* @return the bean name
137+
*/
138+
public String getBeanName() {
139+
return this.beanName;
140+
}
141+
142+
@Override
143+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
144+
this.applicationEventPublisher = applicationEventPublisher;
145+
}
146+
147+
/**
148+
* Get the event publisher.
149+
* @return the publisher
150+
*/
151+
@Nullable
152+
public ApplicationEventPublisher getApplicationEventPublisher() {
153+
return this.applicationEventPublisher;
154+
}
155+
156+
@Override
157+
public boolean isAutoStartup() {
158+
return this.autoStartup;
159+
}
160+
161+
@Override
162+
public void setAutoStartup(boolean autoStartup) {
163+
this.autoStartup = autoStartup;
164+
}
165+
166+
@Override
167+
public int getPhase() {
168+
return this.phase;
169+
}
170+
171+
public void setPhase(int phase) {
172+
this.phase = phase;
173+
}
174+
175+
@Override
176+
public void stop(Runnable callback) {
177+
stop();
178+
callback.run();
179+
}
180+
181+
@Override
182+
public void start() {
183+
this.lifecycleLock.lock();
184+
try {
185+
if (!isRunning()) {
186+
Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
187+
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
188+
doStart();
189+
}
190+
}
191+
finally {
192+
this.lifecycleLock.unlock();
193+
}
194+
}
195+
196+
@Override
197+
public void stop() {
198+
this.lifecycleLock.lock();
199+
try {
200+
if (isRunning()) {
201+
doStop();
202+
}
203+
}
204+
finally {
205+
this.lifecycleLock.unlock();
206+
}
207+
}
208+
209+
@Override
210+
public boolean isRunning() {
211+
return this.running;
212+
}
213+
214+
protected void setRunning(boolean running) {
215+
this.running = running;
216+
}
217+
218+
@Override
219+
public ContainerProperties getContainerProperties() {
220+
return this.containerProperties;
221+
}
222+
223+
@Override
224+
@Nullable
225+
public String getGroupId() {
226+
return this.containerProperties.getGroupId() == null
227+
? (String) this.shareConsumerFactory.getConfigurationProperties().get(ConsumerConfig.GROUP_ID_CONFIG)
228+
: this.containerProperties.getGroupId();
229+
}
230+
231+
@Override
232+
public String getListenerId() {
233+
return this.beanName; // the container factory sets the bean name to the id attribute
234+
}
235+
236+
@Override
237+
public void setupMessageListener(Object messageListener) {
238+
this.containerProperties.setMessageListener(messageListener);
239+
}
240+
241+
protected abstract void doStart();
242+
243+
protected abstract void doStop();
244+
245+
@Override
246+
public void destroy() {
247+
stop();
248+
}
249+
}

0 commit comments

Comments
 (0)