From dad5b8b42c07e0ba0a3bcf3a257c398a8d6ddc1d Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 16 Apr 2025 22:03:18 -0400 Subject: [PATCH 1/4] GH-3847: Add support for Kafka 4.0 new consumer rebalance protocol (KIP-848) Fixes: #3847 Issue link: https://github.com/spring-projects/spring-kafka/issues/3847 KIP details: https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol * Add custom assignor warning in DefaultKafkaConsumerFactory for group.protocol=consumer * Add ConsumerAwareRebalanceListenerTests to verify incremental rebalancing (next gen rebalance) and the legacy rebalancer * Reference Docs updates Signed-off-by: Soby Chacko --- .../rebalance-listeners.adoc | 30 +++ .../antora/modules/ROOT/pages/whats-new.adoc | 6 + .../core/DefaultKafkaConsumerFactory.java | 5 + .../ConsumerRebalanceProtocolTests.java | 174 ++++++++++++++++++ 4 files changed, 215 insertions(+) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc index d1db7d38f1..a9c45a04fd 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc @@ -59,3 +59,33 @@ If you implement `ConsumerRebalanceListener` you should override the default met This is because the listener container will call its own `onPartitionsRevoked` from its implementation of `onPartitionsLost` after calling the method on your implementation. If you implementation delegates to the default behavior, `onPartitionsRevoked` will be called twice each time the `Consumer` calls that method on the container's listener. +[[new-rebalalcne-protocol]] +== Kafka 4.0 Consumer Rebalance Protocol + +Spring for Apache Kafka 4.0 supports Apache https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[Kafka 4.0’s new consumer rebalance protocol] (KIP-848), which enhances performance with server-driven, incremental partition assignments. +This reduces rebalancing downtime for consumer groups. + +To enable the new protocol, configure the `group.protocol` property: + +[source, properties] +---- +spring.kafka.consumer.properties.group.protocol=consumer +---- + +Alternatively, set it programmatically: + +[source, java] +---- +Map props = new HashMap<>(); +props.put("group.protocol", "consumer"); +ConsumerFactory factory = new DefaultKafkaConsumerFactory<>(props); +---- + +The new protocol works seamlessly with `ConsumerAwareRebalanceListener`. +Due to incremental rebalancing, `onPartitionsAssigned` may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol (`group.protocol=classic`). + +The new protocol uses server-side partition assignments, ignoring client-side custom assignors set via `spring.kafka.consumer.partition-assignment-strategy`. +A warning is logged if a custom assignor is detected. +To use custom assignors, set `group.protocol=classic`. + +Test applications with `group.protocol=consumer` in a non-production environment to assess rebalancing impacts, and monitor callback frequency for performance tuning. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index bd65369f20..189fe5200f 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -64,3 +64,9 @@ Several deprecated items have been removed: * The `BrokerAddress` class now uses `org.apache.kafka.server.network.BrokerEndPoint` instead of the deprecated `kafka.cluster.BrokerEndPoint` * The `GlobalEmbeddedKafkaTestExecutionListener` has been updated to work solely with KRaft mode + +[[x40-new-consumer-rebalance-protocol]] +=== New Consumer Rebalance Protocol + +Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. +For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 5308294137..fd1a4b02e7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -428,6 +428,11 @@ private void checkInaccessible(Properties properties, Map modifi protected Consumer createKafkaConsumer(Map configProps) { checkBootstrap(configProps); + if ("consumer".equals(configProps.get("group.protocol")) && + configProps.containsKey(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)) { + LOGGER.warn("Custom partition assignor ignored with group.protocol=consumer; " + + "server-side assignors will be used."); + } Consumer kafkaConsumer = createRawConsumer(configProps); if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) { LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " + diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java new file mode 100644 index 0000000000..ef97575146 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java @@ -0,0 +1,174 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Rudimentary test to verify the consumer rebalance protocols. + * + * @author Soby Chacko + * @since 4.0.0 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = "rebalance.test", partitions = 6) +public class ConsumerRebalanceProtocolTests { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Autowired + private ConsumerFactory consumerFactoryWithNewProtocol; + + @Autowired + private ConsumerFactory consumerFactoryWithLegacyProtocol; + + @Test + public void testRebalanceWithNewProtocol() throws Exception { + testRebalance(consumerFactoryWithNewProtocol, "new-protocol-group", true); + } + + @Test + public void testRebalanceWithLegacyProtocol() throws Exception { + testRebalance(consumerFactoryWithLegacyProtocol, "legacy-protocol-group", false); + } + + private void testRebalance(ConsumerFactory consumerFactory, String groupId, boolean isNewProtocol) + throws Exception { + AtomicReference> consumerRef = new AtomicReference<>(); + CountDownLatch revokedBeforeCommitLatch = new CountDownLatch(1); + CountDownLatch revokedAfterCommitLatch = new CountDownLatch(1); + CountDownLatch assignedLatch = new CountDownLatch(1); + + Set assignedPartitionSet = new HashSet<>(); + + ConsumerAwareRebalanceListener listener = new ConsumerAwareRebalanceListener() { + @Override + public void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) { + consumerRef.set(consumer); + revokedBeforeCommitLatch.countDown(); + assignedPartitionSet.removeAll(partitions); + } + + @Override + public void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions) { + consumerRef.set(consumer); + revokedAfterCommitLatch.countDown(); + assignedPartitionSet.removeAll(partitions); + } + + @Override + public void onPartitionsAssigned(Consumer consumer, Collection partitions) { + consumerRef.set(consumer); + assignedLatch.countDown(); + assignedPartitionSet.addAll(partitions); + } + }; + + ContainerProperties containerProps = new ContainerProperties("rebalance.test1"); + containerProps.setGroupId(groupId); + containerProps.setConsumerRebalanceListener(listener); + containerProps.setMessageListener((MessageListener) (ConsumerRecord record) -> { + }); + + KafkaMessageListenerContainer container1 = new KafkaMessageListenerContainer<>(consumerFactory, containerProps); + container1.start(); + + Thread.sleep(1000); // Wait for initial assignment + + KafkaMessageListenerContainer container2 = new KafkaMessageListenerContainer<>(consumerFactory, containerProps); + container2.start(); + + assertThat(revokedBeforeCommitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(revokedAfterCommitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(consumerRef.get()).isNotNull(); + assertThat(consumerRef.get()).isInstanceOf(Consumer.class); + + // In both protocols (new vs legacy), we expect the assignments to contain 6 partitions. + // The way they get assigned are dictated by the Kafka server and beyond the scope of this test. + // What we are mostly interested is assignment works properly in both protocols. + // The new protocol may take several incremental rebalances to attain the full assignments - thus waiting for a + // longer duration. + Awaitility.await().timeout(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(assignedPartitionSet.size() == 6).isTrue()); + + container1.stop(); + container2.stop(); + } + + @Configuration + @EnableKafka + public static class Config { + + @Bean + public ConsumerFactory consumerFactoryWithNewProtocol(EmbeddedKafkaBroker embeddedKafka) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "new-protocol-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put("group.protocol", "consumer"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConsumerFactory consumerFactoryWithLegacyProtocol(EmbeddedKafkaBroker embeddedKafka) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "legacy-protocol-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put("group.protocol", "classic"); + return new DefaultKafkaConsumerFactory<>(props); + } + } + +} From ebca9ca3547dd972dca2068333dfdbed9a061907 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Sun, 20 Apr 2025 11:19:40 -0400 Subject: [PATCH 2/4] PR review --- .../kafka/receiving-messages/rebalance-listeners.adoc | 9 +++++---- .../kafka/listener/ConsumerRebalanceProtocolTests.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc index a9c45a04fd..d04ae133fb 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc @@ -72,6 +72,9 @@ To enable the new protocol, configure the `group.protocol` property: spring.kafka.consumer.properties.group.protocol=consumer ---- +Keep in mind that, the above property is a Spring Boot property. +If you are not using Spring Boot, you may want to set it manually as shown below. + Alternatively, set it programmatically: [source, java] @@ -82,10 +85,8 @@ ConsumerFactory factory = new DefaultKafkaConsumerFactory<>(prop ---- The new protocol works seamlessly with `ConsumerAwareRebalanceListener`. -Due to incremental rebalancing, `onPartitionsAssigned` may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol (`group.protocol=classic`). +Due to incremental rebalancing, `onPartitionsAssigned` may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol. The new protocol uses server-side partition assignments, ignoring client-side custom assignors set via `spring.kafka.consumer.partition-assignment-strategy`. A warning is logged if a custom assignor is detected. -To use custom assignors, set `group.protocol=classic`. - -Test applications with `group.protocol=consumer` in a non-production environment to assess rebalancing impacts, and monitor callback frequency for performance tuning. +To use custom assignors, set `group.protocol=classic` (which is the default if you don't specify a value for `group.protocol`). diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java index ef97575146..dd93902dbf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java @@ -109,7 +109,7 @@ public void onPartitionsAssigned(Consumer consumer, Collection) (ConsumerRecord record) -> { From 1ec000f507a0e70bd131cf5396f8a413252792c3 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 21 Apr 2025 11:08:43 -0400 Subject: [PATCH 3/4] PR review --- .../kafka/listener/ConsumerRebalanceProtocolTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java index dd93902dbf..389ddb6ace 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java @@ -37,7 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -143,7 +142,6 @@ public void onPartitionsAssigned(Consumer consumer, Collection Date: Mon, 21 Apr 2025 14:45:27 -0400 Subject: [PATCH 4/4] PR review Signed-off-by: Soby Chacko --- .../ConsumerRebalanceProtocolTests.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java index 389ddb6ace..d824dc933a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.awaitility.Awaitility; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -108,23 +109,18 @@ public void onPartitionsAssigned(Consumer consumer, Collection) (ConsumerRecord record) -> { - }); - - KafkaMessageListenerContainer container1 = new KafkaMessageListenerContainer<>(consumerFactory, containerProps); + KafkaMessageListenerContainer container1 = + new KafkaMessageListenerContainer<>(consumerFactory, getContainerProperties(groupId, listener)); container1.start(); - Thread.sleep(1000); // Wait for initial assignment + assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue(); - KafkaMessageListenerContainer container2 = new KafkaMessageListenerContainer<>(consumerFactory, containerProps); + KafkaMessageListenerContainer container2 = + new KafkaMessageListenerContainer<>(consumerFactory, getContainerProperties(groupId, listener)); container2.start(); assertThat(revokedBeforeCommitLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(revokedAfterCommitLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(consumerRef.get()).isNotNull(); assertThat(consumerRef.get()).isInstanceOf(Consumer.class); @@ -141,6 +137,15 @@ public void onPartitionsAssigned(Consumer consumer, Collection) (ConsumerRecord record) -> { + }); + return containerProps; + } + @Configuration public static class Config {