Skip to content

Commit 272c4b0

Browse files
authored
GH-3847: Add support for Kafka 4.0 new consumer rebalance protocol
Fixes: #3847 Issue link: #3847 KIP details: https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol * Add custom assignor warning in DefaultKafkaConsumerFactory for group.protocol=consumer * Add ConsumerAwareRebalanceListenerTests to verify incremental rebalancing (next gen rebalance) and the legacy rebalancer * Reference Docs updates Signed-off-by: Soby Chacko <[email protected]>
1 parent 81d96e1 commit 272c4b0

File tree

4 files changed

+219
-0
lines changed

4 files changed

+219
-0
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc

+31
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,34 @@ If you implement `ConsumerRebalanceListener` you should override the default met
5959
This is because the listener container will call its own `onPartitionsRevoked` from its implementation of `onPartitionsLost` after calling the method on your implementation.
6060
If you implementation delegates to the default behavior, `onPartitionsRevoked` will be called twice each time the `Consumer` calls that method on the container's listener.
6161

62+
[[new-rebalalcne-protocol]]
63+
== Kafka 4.0 Consumer Rebalance Protocol
64+
65+
Spring for Apache Kafka 4.0 supports Apache https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[Kafka 4.0’s new consumer rebalance protocol] (KIP-848), which enhances performance with server-driven, incremental partition assignments.
66+
This reduces rebalancing downtime for consumer groups.
67+
68+
To enable the new protocol, configure the `group.protocol` property:
69+
70+
[source, properties]
71+
----
72+
spring.kafka.consumer.properties.group.protocol=consumer
73+
----
74+
75+
Keep in mind that, the above property is a Spring Boot property.
76+
If you are not using Spring Boot, you may want to set it manually as shown below.
77+
78+
Alternatively, set it programmatically:
79+
80+
[source, java]
81+
----
82+
Map<String, Object> props = new HashMap<>();
83+
props.put("group.protocol", "consumer");
84+
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);
85+
----
86+
87+
The new protocol works seamlessly with `ConsumerAwareRebalanceListener`.
88+
Due to incremental rebalancing, `onPartitionsAssigned` may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol.
89+
90+
The new protocol uses server-side partition assignments, ignoring client-side custom assignors set via `spring.kafka.consumer.partition-assignment-strategy`.
91+
A warning is logged if a custom assignor is detected.
92+
To use custom assignors, set `group.protocol=classic` (which is the default if you don't specify a value for `group.protocol`).

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,9 @@ Several deprecated items have been removed:
6464

6565
* The `BrokerAddress` class now uses `org.apache.kafka.server.network.BrokerEndPoint` instead of the deprecated `kafka.cluster.BrokerEndPoint`
6666
* The `GlobalEmbeddedKafkaTestExecutionListener` has been updated to work solely with KRaft mode
67+
68+
[[x40-new-consumer-rebalance-protocol]]
69+
=== New Consumer Rebalance Protocol
70+
71+
Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].
72+
For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs].

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

+5
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,11 @@ private void checkInaccessible(Properties properties, Map<String, Object> modifi
428428

429429
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
430430
checkBootstrap(configProps);
431+
if ("consumer".equals(configProps.get("group.protocol")) &&
432+
configProps.containsKey(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)) {
433+
LOGGER.warn("Custom partition assignor ignored with group.protocol=consumer; " +
434+
"server-side assignors will be used.");
435+
}
431436
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps);
432437
if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) {
433438
LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " +
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.util.Collection;
21+
import java.util.HashMap;
22+
import java.util.HashSet;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import org.apache.kafka.clients.consumer.Consumer;
30+
import org.apache.kafka.clients.consumer.ConsumerConfig;
31+
import org.apache.kafka.clients.consumer.ConsumerRecord;
32+
import org.apache.kafka.common.TopicPartition;
33+
import org.apache.kafka.common.serialization.StringDeserializer;
34+
import org.awaitility.Awaitility;
35+
import org.jetbrains.annotations.NotNull;
36+
import org.junit.jupiter.api.Test;
37+
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.annotation.Bean;
40+
import org.springframework.context.annotation.Configuration;
41+
import org.springframework.kafka.core.ConsumerFactory;
42+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
43+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
44+
import org.springframework.kafka.test.context.EmbeddedKafka;
45+
import org.springframework.test.annotation.DirtiesContext;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
47+
48+
import static org.assertj.core.api.Assertions.assertThat;
49+
50+
/**
51+
* Rudimentary test to verify the consumer rebalance protocols.
52+
*
53+
* @author Soby Chacko
54+
* @since 4.0.0
55+
*/
56+
@SpringJUnitConfig
57+
@DirtiesContext
58+
@EmbeddedKafka(topics = "rebalance.test", partitions = 6)
59+
public class ConsumerRebalanceProtocolTests {
60+
61+
@Autowired
62+
private EmbeddedKafkaBroker embeddedKafka;
63+
64+
@Autowired
65+
private ConsumerFactory<String, String> consumerFactoryWithNewProtocol;
66+
67+
@Autowired
68+
private ConsumerFactory<String, String> consumerFactoryWithLegacyProtocol;
69+
70+
@Test
71+
public void testRebalanceWithNewProtocol() throws Exception {
72+
testRebalance(consumerFactoryWithNewProtocol, "new-protocol-group", true);
73+
}
74+
75+
@Test
76+
public void testRebalanceWithLegacyProtocol() throws Exception {
77+
testRebalance(consumerFactoryWithLegacyProtocol, "legacy-protocol-group", false);
78+
}
79+
80+
private void testRebalance(ConsumerFactory<String, String> consumerFactory, String groupId, boolean isNewProtocol)
81+
throws Exception {
82+
AtomicReference<Consumer<?, ?>> consumerRef = new AtomicReference<>();
83+
CountDownLatch revokedBeforeCommitLatch = new CountDownLatch(1);
84+
CountDownLatch revokedAfterCommitLatch = new CountDownLatch(1);
85+
CountDownLatch assignedLatch = new CountDownLatch(1);
86+
87+
Set<TopicPartition> assignedPartitionSet = new HashSet<>();
88+
89+
ConsumerAwareRebalanceListener listener = new ConsumerAwareRebalanceListener() {
90+
@Override
91+
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
92+
consumerRef.set(consumer);
93+
revokedBeforeCommitLatch.countDown();
94+
assignedPartitionSet.removeAll(partitions);
95+
}
96+
97+
@Override
98+
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
99+
consumerRef.set(consumer);
100+
revokedAfterCommitLatch.countDown();
101+
assignedPartitionSet.removeAll(partitions);
102+
}
103+
104+
@Override
105+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
106+
consumerRef.set(consumer);
107+
assignedLatch.countDown();
108+
assignedPartitionSet.addAll(partitions);
109+
}
110+
};
111+
112+
KafkaMessageListenerContainer<String, String> container1 =
113+
new KafkaMessageListenerContainer<>(consumerFactory, getContainerProperties(groupId, listener));
114+
container1.start();
115+
116+
assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
117+
118+
KafkaMessageListenerContainer<String, String> container2 =
119+
new KafkaMessageListenerContainer<>(consumerFactory, getContainerProperties(groupId, listener));
120+
container2.start();
121+
122+
assertThat(revokedBeforeCommitLatch.await(10, TimeUnit.SECONDS)).isTrue();
123+
assertThat(revokedAfterCommitLatch.await(10, TimeUnit.SECONDS)).isTrue();
124+
125+
assertThat(consumerRef.get()).isNotNull();
126+
assertThat(consumerRef.get()).isInstanceOf(Consumer.class);
127+
128+
// In both protocols (new vs legacy), we expect the assignments to contain 6 partitions.
129+
// The way they get assigned are dictated by the Kafka server and beyond the scope of this test.
130+
// What we are mostly interested is assignment works properly in both protocols.
131+
// The new protocol may take several incremental rebalances to attain the full assignments - thus waiting for a
132+
// longer duration.
133+
Awaitility.await().timeout(Duration.ofSeconds(30))
134+
.untilAsserted(() -> assertThat(assignedPartitionSet.size() == 6).isTrue());
135+
136+
container1.stop();
137+
container2.stop();
138+
}
139+
140+
private static @NotNull ContainerProperties getContainerProperties(String groupId, ConsumerAwareRebalanceListener listener) {
141+
ContainerProperties containerProps = new ContainerProperties("rebalance.test");
142+
containerProps.setGroupId(groupId);
143+
containerProps.setConsumerRebalanceListener(listener);
144+
containerProps.setMessageListener((MessageListener<String, String>) (ConsumerRecord<String, String> record) -> {
145+
});
146+
return containerProps;
147+
}
148+
149+
@Configuration
150+
public static class Config {
151+
152+
@Bean
153+
public ConsumerFactory<String, String> consumerFactoryWithNewProtocol(EmbeddedKafkaBroker embeddedKafka) {
154+
Map<String, Object> props = new HashMap<>();
155+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
156+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "new-protocol-group");
157+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
158+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
159+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
160+
props.put("group.protocol", "consumer");
161+
return new DefaultKafkaConsumerFactory<>(props);
162+
}
163+
164+
@Bean
165+
public ConsumerFactory<String, String> consumerFactoryWithLegacyProtocol(EmbeddedKafkaBroker embeddedKafka) {
166+
Map<String, Object> props = new HashMap<>();
167+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
168+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "legacy-protocol-group");
169+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
170+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
171+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
172+
props.put("group.protocol", "classic");
173+
return new DefaultKafkaConsumerFactory<>(props);
174+
}
175+
}
176+
177+
}

0 commit comments

Comments
 (0)