Skip to content

Commit 3a2d562

Browse files
committed
Add Spring Pulsar container property customizers
This commit adds the ability for users to register property customizers for the auto-configured Spring Pulsar listener containers. See spring-projects#36347
1 parent 845c4dd commit 3a2d562

File tree

8 files changed

+317
-7
lines changed

8 files changed

+317
-7
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,12 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
149149
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
150150
ConcurrentPulsarListenerContainerFactory<Object> pulsarListenerContainerFactory(
151151
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
152-
TopicResolver topicResolver) {
152+
TopicResolver topicResolver, ObjectProvider<PulsarContainerPropertiesCustomizer> customizersProvider) {
153153
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
154154
containerProperties.setSchemaResolver(schemaResolver);
155155
containerProperties.setTopicResolver(topicResolver);
156156
this.propertiesMapper.customizeContainerProperties(containerProperties);
157+
customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(containerProperties));
157158
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
158159
}
159160

@@ -178,10 +179,12 @@ private void applyReaderBuilderCustomizers(List<ReaderBuilderCustomizer<?>> cust
178179
@Bean
179180
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
180181
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
181-
SchemaResolver schemaResolver) {
182+
SchemaResolver schemaResolver,
183+
ObjectProvider<PulsarReaderContainerPropertiesCustomizer> customizersProvider) {
182184
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
183185
readerContainerProperties.setSchemaResolver(schemaResolver);
184186
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
187+
customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(readerContainerProperties));
185188
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
186189
}
187190

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.listener.PulsarContainerProperties;
20+
21+
/**
22+
* The interface to customize a {@link PulsarContainerProperties}.
23+
*
24+
* @author Chris Bono
25+
* @since 3.2.0
26+
*/
27+
@FunctionalInterface
28+
public interface PulsarContainerPropertiesCustomizer {
29+
30+
/**
31+
* Customizes a {@link PulsarContainerProperties}.
32+
* @param containerProperties the container properties to customize
33+
*/
34+
void customize(PulsarContainerProperties containerProperties);
35+
36+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,25 @@ private void applyMessageConsumerBuilderCustomizers(List<ReactiveMessageConsumer
156156
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
157157
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
158158
ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver,
159-
TopicResolver topicResolver) {
159+
TopicResolver topicResolver,
160+
ObjectProvider<ReactivePulsarContainerPropertiesCustomizer<?>> customizersProvider) {
160161
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
161162
containerProperties.setSchemaResolver(schemaResolver);
162163
containerProperties.setTopicResolver(topicResolver);
163-
this.propertiesMapper.customizeContainerProperties(containerProperties);
164+
List<ReactivePulsarContainerPropertiesCustomizer<?>> customizers = new ArrayList<>();
165+
customizers.add(this.propertiesMapper::customizeContainerProperties);
166+
customizers.addAll(customizersProvider.orderedStream().toList());
167+
applyContainerPropertiesCustomizers(customizers, containerProperties);
164168
return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties);
165169
}
166170

171+
@SuppressWarnings("unchecked")
172+
private void applyContainerPropertiesCustomizers(List<ReactivePulsarContainerPropertiesCustomizer<?>> customizers,
173+
ReactivePulsarContainerProperties<?> containerProperties) {
174+
LambdaSafe.callbacks(ReactivePulsarContainerPropertiesCustomizer.class, customizers, containerProperties)
175+
.invoke((customizer) -> customizer.customize(containerProperties));
176+
}
177+
167178
@Bean
168179
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
169180
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
20+
21+
/**
22+
* The interface to customize a {@link PulsarReaderContainerProperties}.
23+
*
24+
* @author Chris Bono
25+
* @since 3.2.0
26+
*/
27+
@FunctionalInterface
28+
public interface PulsarReaderContainerPropertiesCustomizer {
29+
30+
/**
31+
* Customizes a {@link PulsarReaderContainerProperties}.
32+
* @param containerProperties the container properties to customize
33+
*/
34+
void customize(PulsarReaderContainerProperties containerProperties);
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;
20+
21+
/**
22+
* The interface to customize a {@link ReactivePulsarContainerProperties}.
23+
*
24+
* @param <T> the message payload type
25+
* @author Chris Bono
26+
* @since 3.2.0
27+
*/
28+
@FunctionalInterface
29+
public interface ReactivePulsarContainerPropertiesCustomizer<T> {
30+
31+
/**
32+
* Customizes a {@link ReactivePulsarContainerProperties}.
33+
* @param containerProperties the container properties to customize
34+
*/
35+
void customize(ReactivePulsarContainerProperties<T> containerProperties);
36+
37+
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

+136-1
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import org.apache.pulsar.client.api.ProducerBuilder;
2626
import org.apache.pulsar.client.api.PulsarClient;
2727
import org.apache.pulsar.client.api.ReaderBuilder;
28+
import org.apache.pulsar.client.api.SubscriptionType;
2829
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
2930
import org.apache.pulsar.common.schema.SchemaType;
31+
import org.assertj.core.api.InstanceOfAssertFactories;
3032
import org.junit.jupiter.api.Nested;
3133
import org.junit.jupiter.api.Test;
3234
import org.junit.jupiter.params.ParameterizedTest;
@@ -47,6 +49,7 @@
4749
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
4850
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
4951
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
52+
import org.springframework.pulsar.config.PulsarReaderContainerFactory;
5053
import org.springframework.pulsar.config.PulsarReaderEndpointRegistry;
5154
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
5255
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
@@ -65,6 +68,7 @@
6568
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
6669
import org.springframework.pulsar.core.SchemaResolver;
6770
import org.springframework.pulsar.core.TopicResolver;
71+
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
6872

6973
import static org.assertj.core.api.Assertions.assertThat;
7074
import static org.mockito.Mockito.mock;
@@ -390,7 +394,7 @@ ConsumerBuilderCustomizer<?> customizerBar() {
390394
}
391395

392396
@Nested
393-
class ListenerTests {
397+
class ListenerContainerTests {
394398

395399
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
396400

@@ -464,6 +468,53 @@ void whenObservationsDisabledDoesNotEnableObservation() {
464468
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false));
465469
}
466470

471+
@Test
472+
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
473+
this.contextRunner.withPropertyValues("spring.pulsar.consumer.subscription.type=Shared")
474+
.withUserConfiguration(ContainerPropertiesCustomizerConfig.class)
475+
.run((context) -> {
476+
ConcurrentPulsarListenerContainerFactory<?> containerFactory = context
477+
.getBean(ConcurrentPulsarListenerContainerFactory.class);
478+
// We use subscriptionType to prove user customizers come after base
479+
// props customizer.
480+
// We use subscriptionName to prove user customizers are applied in
481+
// their order.
482+
assertThat(containerFactory)
483+
.extracting(ConcurrentPulsarListenerContainerFactory::getContainerProperties)
484+
.satisfies((containerProps) -> {
485+
assertThat(containerProps.getSubscriptionType()).isEqualTo(SubscriptionType.Failover);
486+
assertThat(containerProps.getSubscriptionName()).isEqualTo("/customizer1/customizer2");
487+
});
488+
});
489+
}
490+
491+
@TestConfiguration(proxyBeanMethods = false)
492+
static class ContainerPropertiesCustomizerConfig {
493+
494+
@Bean
495+
@Order(200)
496+
PulsarContainerPropertiesCustomizer customizerFoo() {
497+
return (props) -> {
498+
props.setSubscriptionType(SubscriptionType.Failover);
499+
String name = "%s/customizer2"
500+
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
501+
props.setSubscriptionName(name);
502+
};
503+
}
504+
505+
@Bean
506+
@Order(100)
507+
PulsarContainerPropertiesCustomizer customizerBar() {
508+
return (props) -> {
509+
props.setSubscriptionType(SubscriptionType.Failover);
510+
String name = "%s/customizer1"
511+
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
512+
props.setSubscriptionName(name);
513+
};
514+
}
515+
516+
}
517+
467518
}
468519

469520
@Nested
@@ -517,4 +568,88 @@ ReaderBuilderCustomizer<?> customizerBar() {
517568

518569
}
519570

571+
@Nested
572+
class ReaderContainerTests {
573+
574+
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
575+
576+
@Test
577+
void whenHasUserDefinedReaderContainerFactoryBeanDoesNotAutoConfigureBean() {
578+
PulsarReaderContainerFactory readerContainerFactory = mock(PulsarReaderContainerFactory.class);
579+
this.contextRunner
580+
.withBean("pulsarReaderContainerFactory", PulsarReaderContainerFactory.class,
581+
() -> readerContainerFactory)
582+
.run((context) -> assertThat(context).getBean(PulsarReaderContainerFactory.class)
583+
.isSameAs(readerContainerFactory));
584+
}
585+
586+
@Test
587+
@SuppressWarnings("rawtypes")
588+
void injectsExpectedBeans() {
589+
PulsarReaderFactory<?> readerFactory = mock(PulsarReaderFactory.class);
590+
SchemaResolver schemaResolver = mock(SchemaResolver.class);
591+
this.contextRunner.withBean("pulsarReaderFactory", PulsarReaderFactory.class, () -> readerFactory)
592+
.withBean("schemaResolver", SchemaResolver.class, () -> schemaResolver)
593+
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class)
594+
.hasFieldOrPropertyWithValue("readerFactory", readerFactory)
595+
.extracting(DefaultPulsarReaderContainerFactory::getContainerProperties)
596+
.hasFieldOrPropertyWithValue("schemaResolver", schemaResolver));
597+
}
598+
599+
@Test
600+
@SuppressWarnings("unchecked")
601+
void whenHasUserDefinedReaderAnnotationBeanPostProcessorBeanDoesNotAutoConfigureBean() {
602+
PulsarReaderAnnotationBeanPostProcessor<String> readerAnnotationBeanPostProcessor = mock(
603+
PulsarReaderAnnotationBeanPostProcessor.class);
604+
this.contextRunner
605+
.withBean("org.springframework.pulsar.config.internalPulsarReaderAnnotationProcessor",
606+
PulsarReaderAnnotationBeanPostProcessor.class, () -> readerAnnotationBeanPostProcessor)
607+
.run((context) -> assertThat(context).getBean(PulsarReaderAnnotationBeanPostProcessor.class)
608+
.isSameAs(readerAnnotationBeanPostProcessor));
609+
}
610+
611+
@Test
612+
void whenHasCustomProperties() {
613+
List<String> properties = new ArrayList<>();
614+
properties.add("spring.pulsar.reader.topics=fromPropsCustomizer");
615+
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)).run((context) -> {
616+
DefaultPulsarReaderContainerFactory<?> factory = context
617+
.getBean(DefaultPulsarReaderContainerFactory.class);
618+
assertThat(factory.getContainerProperties().getTopics()).containsExactly("fromPropsCustomizer");
619+
});
620+
}
621+
622+
@Test
623+
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
624+
this.contextRunner.withPropertyValues("spring.pulsar.reader.topics=fromPropsCustomizer")
625+
.withUserConfiguration(ReaderContainerPropertiesCustomizerConfig.class)
626+
.run((context) -> {
627+
DefaultPulsarReaderContainerFactory<?> containerFactory = context
628+
.getBean(DefaultPulsarReaderContainerFactory.class);
629+
assertThat(containerFactory).extracting(DefaultPulsarReaderContainerFactory::getContainerProperties)
630+
.extracting(PulsarReaderContainerProperties::getTopics,
631+
InstanceOfAssertFactories.list(String.class))
632+
.containsExactly("fromPropsCustomizer", "customizer1", "customizer2");
633+
});
634+
}
635+
636+
@TestConfiguration(proxyBeanMethods = false)
637+
static class ReaderContainerPropertiesCustomizerConfig {
638+
639+
@Bean
640+
@Order(200)
641+
PulsarReaderContainerPropertiesCustomizer customizerFoo() {
642+
return (props) -> props.getTopics().add("customizer2");
643+
}
644+
645+
@Bean
646+
@Order(100)
647+
PulsarReaderContainerPropertiesCustomizer customizerBar() {
648+
return (props) -> props.getTopics().add("customizer1");
649+
}
650+
651+
}
652+
653+
}
654+
520655
}

0 commit comments

Comments
 (0)