Skip to content

Commit c3fc82b

Browse files
authored
Add integration test for @headers annotation (#3844)
Signed-off-by: sullis <[email protected]>
1 parent 525a756 commit c3fc82b

File tree

1 file changed

+74
-1
lines changed

1 file changed

+74
-1
lines changed

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+74-1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.kafka.common.errors.TopicExistsException;
6464
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
6565
import org.apache.kafka.common.serialization.ByteArraySerializer;
66+
import org.awaitility.Awaitility;
6667
import org.jspecify.annotations.NonNull;
6768
import org.jspecify.annotations.Nullable;
6869
import org.junit.jupiter.api.Test;
@@ -141,6 +142,7 @@
141142
import org.springframework.messaging.converter.AbstractMessageConverter;
142143
import org.springframework.messaging.converter.SmartMessageConverter;
143144
import org.springframework.messaging.handler.annotation.Header;
145+
import org.springframework.messaging.handler.annotation.Headers;
144146
import org.springframework.messaging.handler.annotation.Payload;
145147
import org.springframework.messaging.handler.annotation.SendTo;
146148
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
@@ -181,6 +183,7 @@
181183
* @author Soby Chacko
182184
* @author Wang Zhiyang
183185
* @author Borahm Lee
186+
* @author Sean Sullivan
184187
*/
185188
@SpringJUnitConfig
186189
@DirtiesContext
@@ -196,7 +199,7 @@
196199
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
197200
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
198201
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42",
199-
"annotated43", "annotated43reply", "seekToComputeFn"})
202+
"annotated43", "annotated43reply", "seekToComputeFn", "headerMapTopic"})
200203
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
201204
public class EnableKafkaIntegrationTests {
202205

@@ -242,6 +245,9 @@ public class EnableKafkaIntegrationTests {
242245
@Autowired
243246
public MultiJsonListenerBean multiJsonListener;
244247

248+
@Autowired
249+
public HeaderMapListenerBean headerMapListener;
250+
245251
@Autowired
246252
public MultiListenerNoDefault multiNoDefault;
247253

@@ -584,6 +590,50 @@ public void testMultiJson() throws Exception {
584590
assertThat(this.multiJsonListener.validated.valCount).isEqualTo(1);
585591
}
586592

593+
@Test
594+
public void testHeadersAnnotation() throws Exception {
595+
template.setDefaultTopic("headerMapTopic");
596+
597+
template.send(new GenericMessage<>("message1", Collections.emptyMap()));
598+
Awaitility.await().untilAsserted(() -> {
599+
assertThat(this.headerMapListener.invocationCount.get()).isEqualTo(1);
600+
});
601+
assertThat(this.headerMapListener.text).isEqualTo("message1");
602+
assertThat(this.headerMapListener.headers)
603+
.isNotNull()
604+
.containsOnlyKeys(
605+
"kafka_offset",
606+
"kafka_consumer",
607+
"kafka_timestampType",
608+
"kafka_receivedPartitionId",
609+
"kafka_receivedTopic",
610+
"kafka_receivedTimestamp",
611+
"kafka_groupId");
612+
613+
template.send(new GenericMessage<>("message2",
614+
Map.of("akey", "avalue",
615+
"bkey", "bvalue")));
616+
Awaitility.await().untilAsserted(() -> {
617+
assertThat(this.headerMapListener.invocationCount.get()).isEqualTo(2);
618+
});
619+
assertThat(this.headerMapListener.text).isEqualTo("message2");
620+
assertThat(this.headerMapListener.headers)
621+
.isNotNull()
622+
.containsOnlyKeys(
623+
"kafka_offset",
624+
"kafka_consumer",
625+
"kafka_timestampType",
626+
"kafka_receivedPartitionId",
627+
"kafka_receivedTopic",
628+
"kafka_receivedTimestamp",
629+
"kafka_groupId",
630+
"akey",
631+
"bkey")
632+
.contains(
633+
Map.entry("akey", "avalue"),
634+
Map.entry("bkey", "bvalue"));
635+
}
636+
587637
@Test
588638
public void testMultiValidateNoDefaultHandler() throws Exception {
589639
this.kafkaJsonTemplate.setDefaultTopic("annotated40");
@@ -1545,6 +1595,11 @@ public MultiJsonListenerBean multiJsonListener() {
15451595
return new MultiJsonListenerBean();
15461596
}
15471597

1598+
@Bean
1599+
public HeaderMapListenerBean headerMapListener() {
1600+
return new HeaderMapListenerBean();
1601+
}
1602+
15481603
@Bean
15491604
public MultiListenerNoDefault multiNoDefault() {
15501605
return new MultiListenerNoDefault();
@@ -2742,6 +2797,24 @@ public void defaultHandler(Bar bar) {
27422797

27432798
}
27442799

2800+
@KafkaListener(id = "headerMap", topics = "headerMapTopic")
2801+
static class HeaderMapListenerBean {
2802+
2803+
final AtomicInteger invocationCount = new AtomicInteger();
2804+
2805+
private String text;
2806+
2807+
private Map<String, Object> headers;
2808+
2809+
@KafkaHandler(isDefault = true)
2810+
public void defaultHandler(@Payload String text, @Headers Map<String, Object> headers) {
2811+
this.text = text;
2812+
this.headers = headers;
2813+
this.invocationCount.incrementAndGet();
2814+
}
2815+
2816+
}
2817+
27452818
@KafkaListener(id = "multiNoDefault", topics = "annotated40", containerFactory = "kafkaJsonListenerContainerFactory2")
27462819
static class MultiListenerNoDefault {
27472820

0 commit comments

Comments
 (0)