diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc index d1db7d38f1..d04ae133fb 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/rebalance-listeners.adoc @@ -59,3 +59,34 @@ If you implement `ConsumerRebalanceListener` you should override the default met This is because the listener container will call its own `onPartitionsRevoked` from its implementation of `onPartitionsLost` after calling the method on your implementation. If you implementation delegates to the default behavior, `onPartitionsRevoked` will be called twice each time the `Consumer` calls that method on the container's listener. +[[new-rebalalcne-protocol]] +== Kafka 4.0 Consumer Rebalance Protocol + +Spring for Apache Kafka 4.0 supports Apache https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[Kafka 4.0’s new consumer rebalance protocol] (KIP-848), which enhances performance with server-driven, incremental partition assignments. +This reduces rebalancing downtime for consumer groups. + +To enable the new protocol, configure the `group.protocol` property: + +[source, properties] +---- +spring.kafka.consumer.properties.group.protocol=consumer +---- + +Keep in mind that, the above property is a Spring Boot property. +If you are not using Spring Boot, you may want to set it manually as shown below. + +Alternatively, set it programmatically: + +[source, java] +---- +Map props = new HashMap<>(); +props.put("group.protocol", "consumer"); +ConsumerFactory factory = new DefaultKafkaConsumerFactory<>(props); +---- + +The new protocol works seamlessly with `ConsumerAwareRebalanceListener`. +Due to incremental rebalancing, `onPartitionsAssigned` may be called multiple times with smaller partition sets, unlike the single callback typical of the legacy protocol. + +The new protocol uses server-side partition assignments, ignoring client-side custom assignors set via `spring.kafka.consumer.partition-assignment-strategy`. +A warning is logged if a custom assignor is detected. +To use custom assignors, set `group.protocol=classic` (which is the default if you don't specify a value for `group.protocol`). diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index bd65369f20..189fe5200f 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -64,3 +64,9 @@ Several deprecated items have been removed: * The `BrokerAddress` class now uses `org.apache.kafka.server.network.BrokerEndPoint` instead of the deprecated `kafka.cluster.BrokerEndPoint` * The `GlobalEmbeddedKafkaTestExecutionListener` has been updated to work solely with KRaft mode + +[[x40-new-consumer-rebalance-protocol]] +=== New Consumer Rebalance Protocol + +Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. +For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 5308294137..fd1a4b02e7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -428,6 +428,11 @@ private void checkInaccessible(Properties properties, Map modifi protected Consumer createKafkaConsumer(Map configProps) { checkBootstrap(configProps); + if ("consumer".equals(configProps.get("group.protocol")) && + configProps.containsKey(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)) { + LOGGER.warn("Custom partition assignor ignored with group.protocol=consumer; " + + "server-side assignors will be used."); + } Consumer kafkaConsumer = createRawConsumer(configProps); if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) { LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " + diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java new file mode 100644 index 0000000000..d824dc933a --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerRebalanceProtocolTests.java @@ -0,0 +1,177 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Rudimentary test to verify the consumer rebalance protocols. + * + * @author Soby Chacko + * @since 4.0.0 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = "rebalance.test", partitions = 6) +public class ConsumerRebalanceProtocolTests { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @Autowired + private ConsumerFactory consumerFactoryWithNewProtocol; + + @Autowired + private ConsumerFactory consumerFactoryWithLegacyProtocol; + + @Test + public void testRebalanceWithNewProtocol() throws Exception { + testRebalance(consumerFactoryWithNewProtocol, "new-protocol-group", true); + } + + @Test + public void testRebalanceWithLegacyProtocol() throws Exception { + testRebalance(consumerFactoryWithLegacyProtocol, "legacy-protocol-group", false); + } + + private void testRebalance(ConsumerFactory consumerFactory, String groupId, boolean isNewProtocol) + throws Exception { + AtomicReference> consumerRef = new AtomicReference<>(); + CountDownLatch revokedBeforeCommitLatch = new CountDownLatch(1); + CountDownLatch revokedAfterCommitLatch = new CountDownLatch(1); + CountDownLatch assignedLatch = new CountDownLatch(1); + + Set assignedPartitionSet = new HashSet<>(); + + ConsumerAwareRebalanceListener listener = new ConsumerAwareRebalanceListener() { + @Override + public void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) { + consumerRef.set(consumer); + revokedBeforeCommitLatch.countDown(); + assignedPartitionSet.removeAll(partitions); + } + + @Override + public void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions) { + consumerRef.set(consumer); + revokedAfterCommitLatch.countDown(); + assignedPartitionSet.removeAll(partitions); + } + + @Override + public void onPartitionsAssigned(Consumer consumer, Collection partitions) { + consumerRef.set(consumer); + assignedLatch.countDown(); + assignedPartitionSet.addAll(partitions); + } + }; + + KafkaMessageListenerContainer container1 = + new KafkaMessageListenerContainer<>(consumerFactory, getContainerProperties(groupId, listener)); + container1.start(); + + assertThat(assignedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + KafkaMessageListenerContainer container2 = + new KafkaMessageListenerContainer<>(consumerFactory, getContainerProperties(groupId, listener)); + container2.start(); + + assertThat(revokedBeforeCommitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(revokedAfterCommitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(consumerRef.get()).isNotNull(); + assertThat(consumerRef.get()).isInstanceOf(Consumer.class); + + // In both protocols (new vs legacy), we expect the assignments to contain 6 partitions. + // The way they get assigned are dictated by the Kafka server and beyond the scope of this test. + // What we are mostly interested is assignment works properly in both protocols. + // The new protocol may take several incremental rebalances to attain the full assignments - thus waiting for a + // longer duration. + Awaitility.await().timeout(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(assignedPartitionSet.size() == 6).isTrue()); + + container1.stop(); + container2.stop(); + } + + private static @NotNull ContainerProperties getContainerProperties(String groupId, ConsumerAwareRebalanceListener listener) { + ContainerProperties containerProps = new ContainerProperties("rebalance.test"); + containerProps.setGroupId(groupId); + containerProps.setConsumerRebalanceListener(listener); + containerProps.setMessageListener((MessageListener) (ConsumerRecord record) -> { + }); + return containerProps; + } + + @Configuration + public static class Config { + + @Bean + public ConsumerFactory consumerFactoryWithNewProtocol(EmbeddedKafkaBroker embeddedKafka) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "new-protocol-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put("group.protocol", "consumer"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConsumerFactory consumerFactoryWithLegacyProtocol(EmbeddedKafkaBroker embeddedKafka) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "legacy-protocol-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put("group.protocol", "classic"); + return new DefaultKafkaConsumerFactory<>(props); + } + } + +}