Skip to content

Commit dad5b8b

Browse files
committed
spring-projectsGH-3847: Add support for Kafka 4.0 new consumer rebalance protocol (KIP-848)
Fixes: spring-projects#3847 Issue link: spring-projects#3847 KIP details: https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol * Add custom assignor warning in DefaultKafkaConsumerFactory for group.protocol=consumer * Add ConsumerAwareRebalanceListenerTests to verify incremental rebalancing (next gen rebalance) and the legacy rebalancer * Reference Docs updates Signed-off-by: Soby Chacko <[email protected]>
1 parent 525a756 commit dad5b8b

File tree

4 files changed

+215
-0
lines changed

4 files changed

+215
-0
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc

+30
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,33 @@ If you implement `ConsumerRebalanceListener` you should override the default met
5959
This is because the listener container will call its own `onPartitionsRevoked` from its implementation of `onPartitionsLost` after calling the method on your implementation.
6060
If you implementation delegates to the default behavior, `onPartitionsRevoked` will be called twice each time the `Consumer` calls that method on the container's listener.
6161

62+
[[new-rebalalcne-protocol]]
63+
== Kafka 4.0 Consumer Rebalance Protocol
64+
65+
Spring for Apache Kafka 4.0 supports Apache https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[Kafka 4.0’s new consumer rebalance protocol] (KIP-848), which enhances performance with server-driven, incremental partition assignments.
66+
This reduces rebalancing downtime for consumer groups.
67+
68+
To enable the new protocol, configure the `group.protocol` property:
69+
70+
[source, properties]
71+
----
72+
spring.kafka.consumer.properties.group.protocol=consumer
73+
----
74+
75+
Alternatively, set it programmatically:
76+
77+
[source, java]
78+
----
79+
Map<String, Object> props = new HashMap<>();
80+
props.put("group.protocol", "consumer");
81+
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);
82+
----
83+
84+
The new protocol works seamlessly with `ConsumerAwareRebalanceListener`.
85+
Due to incremental rebalancing, `onPartitionsAssigned` may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol (`group.protocol=classic`).
86+
87+
The new protocol uses server-side partition assignments, ignoring client-side custom assignors set via `spring.kafka.consumer.partition-assignment-strategy`.
88+
A warning is logged if a custom assignor is detected.
89+
To use custom assignors, set `group.protocol=classic`.
90+
91+
Test applications with `group.protocol=consumer` in a non-production environment to assess rebalancing impacts, and monitor callback frequency for performance tuning.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,9 @@ Several deprecated items have been removed:
6464

6565
* The `BrokerAddress` class now uses `org.apache.kafka.server.network.BrokerEndPoint` instead of the deprecated `kafka.cluster.BrokerEndPoint`
6666
* The `GlobalEmbeddedKafkaTestExecutionListener` has been updated to work solely with KRaft mode
67+
68+
[[x40-new-consumer-rebalance-protocol]]
69+
=== New Consumer Rebalance Protocol
70+
71+
Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].
72+
For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs].

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

+5
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,11 @@ private void checkInaccessible(Properties properties, Map<String, Object> modifi
428428

429429
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
430430
checkBootstrap(configProps);
431+
if ("consumer".equals(configProps.get("group.protocol")) &&
432+
configProps.containsKey(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)) {
433+
LOGGER.warn("Custom partition assignor ignored with group.protocol=consumer; " +
434+
"server-side assignors will be used.");
435+
}
431436
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps);
432437
if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) {
433438
LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " +
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.util.Collection;
21+
import java.util.HashMap;
22+
import java.util.HashSet;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import org.apache.kafka.clients.consumer.Consumer;
30+
import org.apache.kafka.clients.consumer.ConsumerConfig;
31+
import org.apache.kafka.clients.consumer.ConsumerRecord;
32+
import org.apache.kafka.common.TopicPartition;
33+
import org.apache.kafka.common.serialization.StringDeserializer;
34+
import org.awaitility.Awaitility;
35+
import org.junit.jupiter.api.Test;
36+
37+
import org.springframework.beans.factory.annotation.Autowired;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.kafka.annotation.EnableKafka;
41+
import org.springframework.kafka.core.ConsumerFactory;
42+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
43+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
44+
import org.springframework.kafka.test.context.EmbeddedKafka;
45+
import org.springframework.test.annotation.DirtiesContext;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
47+
48+
import static org.assertj.core.api.Assertions.assertThat;
49+
50+
/**
51+
* Rudimentary test to verify the consumer rebalance protocols.
52+
*
53+
* @author Soby Chacko
54+
* @since 4.0.0
55+
*/
56+
@SpringJUnitConfig
57+
@DirtiesContext
58+
@EmbeddedKafka(topics = "rebalance.test", partitions = 6)
59+
public class ConsumerRebalanceProtocolTests {
60+
61+
@Autowired
62+
private EmbeddedKafkaBroker embeddedKafka;
63+
64+
@Autowired
65+
private ConsumerFactory<String, String> consumerFactoryWithNewProtocol;
66+
67+
@Autowired
68+
private ConsumerFactory<String, String> consumerFactoryWithLegacyProtocol;
69+
70+
@Test
71+
public void testRebalanceWithNewProtocol() throws Exception {
72+
testRebalance(consumerFactoryWithNewProtocol, "new-protocol-group", true);
73+
}
74+
75+
@Test
76+
public void testRebalanceWithLegacyProtocol() throws Exception {
77+
testRebalance(consumerFactoryWithLegacyProtocol, "legacy-protocol-group", false);
78+
}
79+
80+
private void testRebalance(ConsumerFactory<String, String> consumerFactory, String groupId, boolean isNewProtocol)
81+
throws Exception {
82+
AtomicReference<Consumer<?, ?>> consumerRef = new AtomicReference<>();
83+
CountDownLatch revokedBeforeCommitLatch = new CountDownLatch(1);
84+
CountDownLatch revokedAfterCommitLatch = new CountDownLatch(1);
85+
CountDownLatch assignedLatch = new CountDownLatch(1);
86+
87+
Set<TopicPartition> assignedPartitionSet = new HashSet<>();
88+
89+
ConsumerAwareRebalanceListener listener = new ConsumerAwareRebalanceListener() {
90+
@Override
91+
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
92+
consumerRef.set(consumer);
93+
revokedBeforeCommitLatch.countDown();
94+
assignedPartitionSet.removeAll(partitions);
95+
}
96+
97+
@Override
98+
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
99+
consumerRef.set(consumer);
100+
revokedAfterCommitLatch.countDown();
101+
assignedPartitionSet.removeAll(partitions);
102+
}
103+
104+
@Override
105+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
106+
consumerRef.set(consumer);
107+
assignedLatch.countDown();
108+
assignedPartitionSet.addAll(partitions);
109+
}
110+
};
111+
112+
ContainerProperties containerProps = new ContainerProperties("rebalance.test1");
113+
containerProps.setGroupId(groupId);
114+
containerProps.setConsumerRebalanceListener(listener);
115+
containerProps.setMessageListener((MessageListener<String, String>) (ConsumerRecord<String, String> record) -> {
116+
});
117+
118+
KafkaMessageListenerContainer<String, String> container1 = new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
119+
container1.start();
120+
121+
Thread.sleep(1000); // Wait for initial assignment
122+
123+
KafkaMessageListenerContainer<String, String> container2 = new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
124+
container2.start();
125+
126+
assertThat(revokedBeforeCommitLatch.await(10, TimeUnit.SECONDS)).isTrue();
127+
assertThat(revokedAfterCommitLatch.await(10, TimeUnit.SECONDS)).isTrue();
128+
assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
129+
130+
assertThat(consumerRef.get()).isNotNull();
131+
assertThat(consumerRef.get()).isInstanceOf(Consumer.class);
132+
133+
// In both protocols (new vs legacy), we expect the assignments to contain 6 partitions.
134+
// The way they get assigned are dictated by the Kafka server and beyond the scope of this test.
135+
// What we are mostly interested is assignment works properly in both protocols.
136+
// The new protocol may take several incremental rebalances to attain the full assignments - thus waiting for a
137+
// longer duration.
138+
Awaitility.await().timeout(Duration.ofSeconds(30))
139+
.untilAsserted(() -> assertThat(assignedPartitionSet.size() == 6).isTrue());
140+
141+
container1.stop();
142+
container2.stop();
143+
}
144+
145+
@Configuration
146+
@EnableKafka
147+
public static class Config {
148+
149+
@Bean
150+
public ConsumerFactory<String, String> consumerFactoryWithNewProtocol(EmbeddedKafkaBroker embeddedKafka) {
151+
Map<String, Object> props = new HashMap<>();
152+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
153+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "new-protocol-group");
154+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
155+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
156+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
157+
props.put("group.protocol", "consumer");
158+
return new DefaultKafkaConsumerFactory<>(props);
159+
}
160+
161+
@Bean
162+
public ConsumerFactory<String, String> consumerFactoryWithLegacyProtocol(EmbeddedKafkaBroker embeddedKafka) {
163+
Map<String, Object> props = new HashMap<>();
164+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
165+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "legacy-protocol-group");
166+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
167+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
168+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
169+
props.put("group.protocol", "classic");
170+
return new DefaultKafkaConsumerFactory<>(props);
171+
}
172+
}
173+
174+
}

0 commit comments

Comments
 (0)