Skip to content

Commit 34c5619

Browse files
garyrussellartembilan
authored andcommitted
GH-2208: Fix Manual Nack with Mutating Interceptor
Resolves #2208 `equals()` test on `ConsumerRecord` fails (not implemented). Compare topic, partition and offset instead. **cherry-pick to 2.9.x, 2.8.x, 2.7.x**
1 parent 6022835 commit 34c5619

File tree

3 files changed

+22
-2
lines changed

3 files changed

+22
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -2447,14 +2447,20 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
24472447
Iterator<ConsumerRecord<K, V>> iterator2 = records.iterator();
24482448
while (iterator2.hasNext()) {
24492449
ConsumerRecord<K, V> next = iterator2.next();
2450-
if (next.equals(record) || list.size() > 0) {
2450+
if (list.size() > 0 || recordsEqual(record, next)) {
24512451
list.add(next);
24522452
}
24532453
}
24542454
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
24552455
pauseForNackSleep();
24562456
}
24572457

2458+
private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec2) {
2459+
return rec1.topic().equals(rec2.topic())
2460+
&& rec1.partition() == rec2.partition()
2461+
&& rec1.offset() == rec2.offset();
2462+
}
2463+
24582464
private void pauseForNackSleep() {
24592465
if (this.nackSleep > 0) {
24602466
this.nackWake = System.currentTimeMillis() + this.nackSleep;

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ public interface RecordInterceptor<K, V> extends ThreadStateProcessor {
3737

3838
/**
3939
* Perform some action on the record or return a different one. If null is returned
40-
* the record will be skipped. Invoked before the listener.
40+
* the record will be skipped. Invoked before the listener. IMPORTANT; if this method
41+
* returns a different record, the topic, partition and offset must not be changed
42+
* to avoid undesirable side-effects.
4143
* @param record the record.
4244
* @param consumer the consumer.
4345
* @return the record or null.

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java

+12
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6262
import org.springframework.kafka.support.Acknowledgment;
6363
import org.springframework.kafka.test.utils.KafkaTestUtils;
64+
import org.springframework.lang.Nullable;
6465
import org.springframework.test.annotation.DirtiesContext;
6566
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6667

@@ -243,6 +244,17 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
243244
factory.setConsumerFactory(consumerFactory());
244245
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
245246
factory.getContainerProperties().setMissingTopicsFatal(false);
247+
factory.setRecordInterceptor(new RecordInterceptor() {
248+
249+
@Override
250+
@Nullable
251+
public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) {
252+
return new ConsumerRecord(record.topic(), record.partition(), record.offset(), 0L,
253+
TimestampType.NO_TIMESTAMP_TYPE, 0, 0, record.key(), record.value(), record.headers(),
254+
Optional.empty());
255+
}
256+
257+
});
246258
return factory;
247259
}
248260

0 commit comments

Comments
 (0)