Skip to content

Commit 3ebce88

Browse files
authored
GH-107: Make Splitter Function as Flux-based
Fixes: #107 When we have a composition like this: ``` spring.cloud.function.definition = fileSupplier|splitterFunction ``` Then final "function" signature is like this `Supplier<Flux<Message<List<Message<?>>>>>`. And that is exactly what we don't expected from the splitter in the end of the composition. While Spring Cloud Stream supports de-batching, it works for a `List` output only if function is bound by itself. In case of composition we got just a `Supplier`. * Rework `SplitterFunctionConfiguration` for `splitterFunction` from `Function<Message<?>, List<Message<?>>>` to `Function<Flux<Message<?>>, Flux<Message<?>>>` signature to support every possible simple and composed bindings in Spring Cloud Stream * Rework `SplitterFunctionApplicationTests` for new expected `Function<Flux<Message<?>>, Flux<Message<?>>>` signature * Rework `zip-split-rabbit-binder` sample to not use a `flattenFunction` workaround and fully rely on whatever is new for the `splitterFunction` * Fix `ZipSplitRabbitBinderApplicationTests` moving the `@RabbitListener` into a `@TestConfiguration`. Apparently in a new Spring Boot version the test class is registered as a bean much later than normal application context startup. Therefore, even if the `@RabbitListener` parsed and registered properly, the `RabbitAdmin` bean has been already started to see our extra bean definition for the `@QueueBinding` Changing signature for the splitterFunction to reactive types would make it working even with a Supplier composition. Fix JDBC & MongoDB suppliers to deal with a new version of Splitter function Fix Checkstyle violations Use `IntegrationReactiveUtils.messageSourceToFlux()` API The `IntegrationReactiveUtils.messageSourceToFlux()` provides convenient API to represent a `MessageSource` as a `Flux` to poll this source. The API has an error handling logic and delay when no data emitted by the source * Remove `org.springframework.cloud` dependencies from the project since we don't use `@PollableBean` anymore, which comes from the `spring-cloud-function-context` * Simplify `JdbcSupplierConfiguration` and `MongodbSupplierConfiguration` code more: more injections to the respective bean method. * Use `(__) ->` lambda syntax for unused argument * Remove unused `ThreadLocalFluxSinkMessageChannel` internal class * Update Copyrights of the classes in this change Upgrade to Gradle `8.12`
1 parent 9ce6a1b commit 3ebce88

File tree

18 files changed

+69
-133
lines changed

18 files changed

+69
-133
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ allprojects {
5858
mavenBom "io.debezium:debezium-bom:$debeziumVersion"
5959
mavenBom "io.awspring.cloud:spring-cloud-aws-dependencies:$springCloudAwsVersion"
6060
mavenBom "org.springframework.boot:spring-boot-dependencies:$springBootVersion"
61-
mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
6261
mavenBom "ai.djl:bom:$djlVersion"
6362
}
6463
}

dependencies.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
ext {
22
springBootVersion = '3.4.1'
3-
springCloudVersion = '2024.0.0'
43
springCloudAwsVersion = '3.2.1'
54

65
debeziumVersion = '3.0.6.Final'

function/spring-splitter-function/src/main/java/org/springframework/cloud/fn/splitter/SplitterFunctionConfiguration.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2024 the original author or authors.
2+
* Copyright 2011-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,11 +17,9 @@
1717
package org.springframework.cloud.fn.splitter;
1818

1919
import java.nio.charset.Charset;
20-
import java.util.List;
2120
import java.util.Optional;
2221
import java.util.function.Function;
2322

24-
import org.reactivestreams.Publisher;
2523
import reactor.core.publisher.Flux;
2624

2725
import org.springframework.beans.factory.annotation.Qualifier;
@@ -32,13 +30,12 @@
3230
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3331
import org.springframework.context.annotation.Bean;
3432
import org.springframework.context.annotation.Conditional;
35-
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
33+
import org.springframework.integration.channel.FluxMessageChannel;
3634
import org.springframework.integration.file.splitter.FileSplitter;
3735
import org.springframework.integration.splitter.AbstractMessageSplitter;
3836
import org.springframework.integration.splitter.DefaultMessageSplitter;
3937
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
4038
import org.springframework.messaging.Message;
41-
import org.springframework.messaging.MessageChannel;
4239

4340
/**
4441
* Auto-configuration for Splitter function.
@@ -51,7 +48,7 @@
5148
public class SplitterFunctionConfiguration {
5249

5350
@Bean
54-
public Function<Message<?>, List<Message<?>>> splitterFunction(
51+
public Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction(
5552
@Qualifier("expressionSplitter") Optional<AbstractMessageSplitter> expressionSplitter,
5653
@Qualifier("fileSplitter") Optional<AbstractMessageSplitter> fileSplitter,
5754
@Qualifier("defaultSplitter") Optional<AbstractMessageSplitter> defaultSplitter,
@@ -60,13 +57,13 @@ public Function<Message<?>, List<Message<?>>> splitterFunction(
6057
AbstractMessageSplitter messageSplitter = expressionSplitter.or(() -> fileSplitter)
6158
.or(() -> defaultSplitter)
6259
.get();
60+
6361
messageSplitter.setApplySequence(splitterFunctionProperties.isApplySequence());
64-
ThreadLocalFluxSinkMessageChannel outputChannel = new ThreadLocalFluxSinkMessageChannel();
62+
FluxMessageChannel inputChannel = new FluxMessageChannel();
63+
inputChannel.subscribe(messageSplitter);
64+
FluxMessageChannel outputChannel = new FluxMessageChannel();
6565
messageSplitter.setOutputChannel(outputChannel);
66-
return (message) -> {
67-
messageSplitter.handleMessage(message);
68-
return outputChannel.publisherThreadLocal.get();
69-
};
66+
return (messageFlux) -> Flux.from(outputChannel).doOnRequest((__) -> inputChannel.subscribeTo(messageFlux));
7067
}
7168

7269
@Bean
@@ -117,22 +114,4 @@ static class FileMarkers {
117114

118115
}
119116

120-
private static final class ThreadLocalFluxSinkMessageChannel
121-
implements MessageChannel, ReactiveStreamsSubscribableChannel {
122-
123-
private final ThreadLocal<List<Message<?>>> publisherThreadLocal = new ThreadLocal<>();
124-
125-
@Override
126-
@SuppressWarnings("unchecked")
127-
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
128-
this.publisherThreadLocal.set(Flux.from(publisher).collectList().cast(List.class).block());
129-
}
130-
131-
@Override
132-
public boolean send(Message<?> message, long l) {
133-
throw new UnsupportedOperationException("This channel only supports a reactive 'subscribeTo()' ");
134-
}
135-
136-
}
137-
138117
}

function/spring-splitter-function/src/test/java/org/springframework/cloud/fn/splitter/SplitterFunctionApplicationTests.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2024 the original author or authors.
2+
* Copyright 2011-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,12 @@
1616

1717
package org.springframework.cloud.fn.splitter;
1818

19-
import java.util.List;
19+
import java.time.Duration;
2020
import java.util.function.Function;
2121

2222
import org.junit.jupiter.api.Test;
23+
import reactor.core.publisher.Flux;
24+
import reactor.test.StepVerifier;
2325

2426
import org.springframework.beans.factory.annotation.Autowired;
2527
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -28,19 +30,18 @@
2830
import org.springframework.messaging.support.GenericMessage;
2931
import org.springframework.test.annotation.DirtiesContext;
3032

31-
import static org.assertj.core.api.Assertions.assertThat;
32-
3333
@SpringBootTest(properties = "splitter.expression=payload.split(',')")
3434
@DirtiesContext
3535
public class SplitterFunctionApplicationTests {
3636

3737
@Autowired
38-
Function<Message<?>, List<Message<?>>> splitter;
38+
Function<Flux<Message<?>>, Flux<Message<?>>> splitter;
3939

4040
@Test
4141
public void testExpressionSplitter() {
42-
List<Message<?>> messageList = this.splitter.apply(new GenericMessage<>("hello,world"));
43-
assertThat(messageList).extracting((m) -> m.getPayload().toString()).contains("hello", "world");
42+
Flux<Message<?>> messageFlux = this.splitter.apply(Flux.just(new GenericMessage<>("hello,world")));
43+
Flux<String> payloads = messageFlux.map(Message::getPayload).map(Object::toString);
44+
StepVerifier.create(payloads).expectNext("hello", "world").thenCancel().verify(Duration.ofSeconds(30));
4445
}
4546

4647
@SpringBootApplication

gradle/wrapper/gradle-wrapper.jar

130 Bytes
Binary file not shown.

gradle/wrapper/gradle-wrapper.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionSha256Sum=544c35d6bd849ae8a5ed0bcea39ba677dc40f49df7d1835561582da2009b961d
4-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
3+
distributionSha256Sum=7a00d51fb93147819aab76024feece20b6b84e420694101f276be952e08bef03
4+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-bin.zip
55
networkTimeout=10000
66
validateDistributionUrl=true
77
zipStoreBase=GRADLE_USER_HOME

gradlew

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
#
18+
# SPDX-License-Identifier: Apache-2.0
19+
#
1820

1921
##############################################################################
2022
#
@@ -55,7 +57,7 @@
5557
# Darwin, MinGW, and NonStop.
5658
#
5759
# (3) This script is generated from the Groovy template
58-
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
60+
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
5961
# within the Gradle project.
6062
#
6163
# You can find Gradle at https://github.com/gradle/gradle/.
@@ -84,7 +86,7 @@ done
8486
# shellcheck disable=SC2034
8587
APP_BASE_NAME=${0##*/}
8688
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
87-
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
89+
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
8890

8991
# Use the maximum available, or set MAX_FD != -1 to use that value.
9092
MAX_FD=maximum

gradlew.bat

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
@rem See the License for the specific language governing permissions and
1414
@rem limitations under the License.
1515
@rem
16+
@rem SPDX-License-Identifier: Apache-2.0
17+
@rem
1618

1719
@if "%DEBUG%"=="" @echo off
1820
@rem ##########################################################################

samples/zip-split-rabbit-binder/README.adoc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ The second one is for `UnZipTransformer`, which we use for a custom function to
1818
The `splitterFunction` is used in a `FileSplitter` mode to read lines from unzipped entries and emit each of them as an individual message.
1919
Essentially, we are splitting twice: zip entries, and content of each file.
2020

21-
The composition is like this: `fileSupplier|unzipFunction|splitterFunction|flattenFunction`.
22-
(The `flattenFunction` will be explained latter).
21+
The composition is like this: `fileSupplier|unzipFunction|splitterFunction.
2322
The result of this composition is a `Supplier<Flux<Mesage<?>>>` and we bind it into a RabbitMQ `unzipped_data_exchange` using Spring Cloud Stream.
2423

2524
For `fileSupplier` we provide these configuration properties:
@@ -49,8 +48,6 @@ Which is a trigger for that function to use a `FileSplitter` for zip entries to
4948
The custom `ZipSplitRabbitBinderApplication.unzipFunction()` (might be a candidate for the future Functions Catalog version) uses `Flux` API to unzip polled files via `UnZipTransformer` and then `flatMapIterable()` for zip entries.
5049
Then those entries are fed into a `splitterFunction` for `FileSplitter` mode.
5150

52-
The mention `ZipSplitRabbitBinderApplication.flattenFunction()` is needed for now here since `splitterFucntion` produces a `List<Message>` which cannot be https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/producing-and-consuming-messages.html#batch-producers[de-batched] by Spring Cloud Stream since our final product of the composition is, essentially, `Supplier<Flux<Message<?>>>`.
53-
5451
To run the application from main `ZipSplitRabbitBinderApplication` class (`./gradlew bootRun`), the RabbitMQ broker must be supplied on the target environment.
5552

5653
The test environment for this sample uses `org.springframework.boot:spring-boot-testcontainers` and `org.testcontainers:rabbitmq` to run RabbitMQ in Docker container and wire it properly into Spring Boot auto-configuration.

samples/zip-split-rabbit-binder/src/main/java/com/example/ZipSplitRabbitBinderApplication.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.example;
22

33
import java.io.File;
4-
import java.util.List;
54
import java.util.Map;
65
import java.util.function.Function;
76

@@ -34,10 +33,4 @@ Function<Flux<Message<File>>, Flux<File>> unzipFunction(UnZipTransformer unZipTr
3433
.flatMapIterable(Map::values);
3534
}
3635

37-
// TODO until 'splitterFunction' is fixed this way: https://github.com/spring-cloud/spring-functions-catalog/issues/107
38-
@Bean
39-
Function<Flux<Message<List<Message<?>>>>, Flux<Message<?>>> flattenFunction() {
40-
return messageFlux -> messageFlux.map(Message::getPayload).flatMapIterable(Function.identity());
41-
}
42-
4336
}

samples/zip-split-rabbit-binder/src/main/resources/application.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ spring:
44

55
cloud:
66
function:
7-
definition: fileSupplier|unzipFunction|splitterFunction|flattenFunction
7+
definition: fileSupplier|unzipFunction|splitterFunction
88

99
stream:
1010
bindings:
11-
fileSupplier|unzipFunction|splitterFunction|flattenFunction-out-0:
11+
fileSupplier|unzipFunction|splitterFunction-out-0:
1212
destination: unzipped_data_exchange
1313

1414
file:

samples/zip-split-rabbit-binder/src/test/java/com/example/ZipSplitRabbitBinderApplicationTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.amqp.rabbit.annotation.QueueBinding;
2020
import org.springframework.amqp.rabbit.annotation.RabbitListener;
2121
import org.springframework.boot.test.context.SpringBootTest;
22+
import org.springframework.boot.test.context.TestConfiguration;
2223
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
2324
import org.springframework.test.annotation.DirtiesContext;
2425

@@ -47,11 +48,16 @@ void zippedFilesAreSplittedToRabbitBinding() throws InterruptedException {
4748
}
4849
}
4950

50-
@RabbitListener(bindings = @QueueBinding(value = @Queue,
51-
exchange = @Exchange(value = "unzipped_data_exchange", type = ExchangeTypes.TOPIC), key = "#"))
52-
void receiveDataFromSplittedZips(String payload) {
53-
LOG.info("A line from zip entry: " + payload);
54-
DATA_SINK.offer(payload);
51+
@TestConfiguration
52+
static class RabbitListenerTestConfiguration {
53+
54+
@RabbitListener(bindings = @QueueBinding(value = @Queue,
55+
exchange = @Exchange(value = "unzipped_data_exchange", type = ExchangeTypes.TOPIC), key = "#"))
56+
void receiveDataFromSplittedZips(String payload) {
57+
LOG.info("A line from zip entry: " + payload);
58+
DATA_SINK.offer(payload);
59+
}
60+
5561
}
5662

5763
}

supplier/spring-jdbc-supplier/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ dependencies {
22
api project(':spring-splitter-function')
33
api 'org.springframework.integration:spring-integration-jdbc'
44
api 'org.springframework.boot:spring-boot-starter-jdbc'
5-
api 'org.springframework.cloud:spring-cloud-function-context'
65

76
runtimeOnly 'org.hsqldb:hsqldb'
87
runtimeOnly 'com.h2database:h2'

supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.cloud.fn.supplier.jdbc;
1818

19-
import java.util.List;
2019
import java.util.function.Function;
2120
import java.util.function.Supplier;
2221

@@ -30,10 +29,10 @@
3029
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3130
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
3231
import org.springframework.cloud.fn.splitter.SplitterFunctionConfiguration;
33-
import org.springframework.cloud.function.context.PollableBean;
3432
import org.springframework.context.annotation.Bean;
3533
import org.springframework.integration.core.MessageSource;
3634
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
35+
import org.springframework.integration.util.IntegrationReactiveUtils;
3736
import org.springframework.lang.Nullable;
3837
import org.springframework.messaging.Message;
3938

@@ -47,45 +46,26 @@
4746
@EnableConfigurationProperties(JdbcSupplierProperties.class)
4847
public class JdbcSupplierConfiguration {
4948

50-
private final JdbcSupplierProperties properties;
51-
52-
private final DataSource dataSource;
53-
54-
public JdbcSupplierConfiguration(JdbcSupplierProperties properties, DataSource dataSource) {
55-
this.properties = properties;
56-
this.dataSource = dataSource;
57-
}
58-
5949
@Bean
60-
public MessageSource<Object> jdbcMessageSource(
50+
public JdbcPollingChannelAdapter jdbcMessageSource(JdbcSupplierProperties properties, DataSource dataSource,
6151
@Nullable ComponentCustomizer<JdbcPollingChannelAdapter> jdbcPollingChannelAdapterCustomizer) {
6252

63-
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource,
64-
this.properties.getQuery());
65-
jdbcPollingChannelAdapter.setMaxRows(this.properties.getMaxRows());
66-
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
53+
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
54+
properties.getQuery());
55+
jdbcPollingChannelAdapter.setMaxRows(properties.getMaxRows());
56+
jdbcPollingChannelAdapter.setUpdateSql(properties.getUpdate());
6757
if (jdbcPollingChannelAdapterCustomizer != null) {
6858
jdbcPollingChannelAdapterCustomizer.customize(jdbcPollingChannelAdapter);
6959
}
7060
return jdbcPollingChannelAdapter;
7161
}
7262

7363
@Bean(name = "jdbcSupplier")
74-
@PollableBean
7564
@ConditionalOnProperty(prefix = "jdbc.supplier", name = "split", matchIfMissing = true)
76-
public Supplier<Flux<Message<?>>> splittedSupplier(MessageSource<Object> jdbcMessageSource,
77-
Function<Message<?>, List<Message<?>>> splitterFunction) {
65+
public Supplier<Flux<Message<?>>> splittedSupplier(JdbcPollingChannelAdapter jdbcMessageSource,
66+
Function<Flux<Message<Object>>, Flux<Message<?>>> splitterFunction) {
7867

79-
return () -> {
80-
Message<?> received = jdbcMessageSource.receive();
81-
if (received != null) {
82-
// multiple Message<Map<String, Object>>
83-
return Flux.fromIterable(splitterFunction.apply(received));
84-
}
85-
else {
86-
return Flux.empty();
87-
}
88-
};
68+
return () -> IntegrationReactiveUtils.messageSourceToFlux(jdbcMessageSource).transform(splitterFunction);
8969
}
9070

9171
@Bean

0 commit comments

Comments
 (0)