Skip to content

Commit 00b9ea5

Browse files
committed
Add batch message splitting and unit tests
- Added a feature that transforms a Message containing a Collection payload into individual Message objects. - This enables easier processing of batch messages in frameworks like Spring Cloud Stream. - Introduced `enableSplitting` boolean variable in `FunctionInvocationWrapper` to control message splitting behavior. - Made `enableSplitting` configurable at initialization, ensuring that the existing behavior remains unaffected if not set. Signed-off-by: 2tsumo-hitori <[email protected]>
1 parent 1234a94 commit 00b9ea5

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

Diff for: spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

+17
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
* @author Roman Samarev
8888
* @author Soby Chacko
8989
* @author Chris Bono
90+
* @author 2tsumo-hitori
9091
*/
9192
public class SimpleFunctionRegistry implements FunctionRegistry {
9293
protected Log logger = LogFactory.getLog(this.getClass());
@@ -419,6 +420,8 @@ public class FunctionInvocationWrapper implements Function<Object, Object>, Cons
419420

420421
protected boolean wrapped;
421422

423+
private boolean enableSplitting = false;
424+
422425
private final ThreadLocal<Message<Object>> unconvertedResult = new ThreadLocal<>();
423426

424427
private PostProcessingFunction postProcessor;
@@ -575,6 +578,14 @@ public Class<?> getRawInputType() {
575578
return this.inputType == null ? null : FunctionTypeUtils.getRawType(this.inputType);
576579
}
577580

581+
public boolean isEnableSplitting() {
582+
return enableSplitting;
583+
}
584+
585+
public void setEnableSplitting(boolean enableSplitting) {
586+
this.enableSplitting = enableSplitting;
587+
}
588+
578589
/**
579590
*
580591
*/
@@ -815,6 +826,9 @@ private Object enrichInvocationResultIfNecessary(Object input, Object result) {
815826
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {
816827
result = functionInvocationHelper.postProcessResult(result, (Message) input);
817828
}
829+
else if (this.enableSplitting && !FunctionTypeUtils.isCollectionOfMessage(this.outputType)) {
830+
result = ((Collection<?>) result).stream().map(it -> MessageBuilder.withPayload(it).copyHeaders(sanitizeHeaders(((Message) input).getHeaders())).build()).toList();
831+
}
818832
else if (!FunctionTypeUtils.isCollectionOfMessage(this.outputType)) {
819833
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
820834
}
@@ -1266,6 +1280,9 @@ else if (convertedOutput instanceof Message) {
12661280
else if (convertedOutput instanceof Collection && this.isOutputTypeMessage()) {
12671281
convertedOutput = this.convertMultipleOutputValuesIfNecessary(convertedOutput, ObjectUtils.isEmpty(contentType) ? null : contentType);
12681282
}
1283+
else if (convertedOutput instanceof Collection && this.isEnableSplitting()) {
1284+
convertedOutput = this.convertMultipleOutputValuesIfNecessary(convertedOutput, ObjectUtils.isEmpty(contentType) ? null : contentType);
1285+
}
12691286
else if (ObjectUtils.isArray(convertedOutput) && !(convertedOutput instanceof byte[])) {
12701287
convertedOutput = this.convertMultipleOutputValuesIfNecessary(convertedOutput, ObjectUtils.isEmpty(contentType) ? null : contentType);
12711288
}

Diff for: spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java

+42
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,38 @@ public void testConcurrencyOnLookup() throws Exception {
880880
assertThat(counter.get()).isEqualTo(100);
881881
}
882882

883+
@SuppressWarnings("rawtypes")
884+
@Test
885+
void useBatchMessageSplitterTest() {
886+
FunctionCatalog functionCatalog = this.configureCatalog(BatchMessageSplitterConfiguration.class);
887+
FunctionInvocationWrapper function = functionCatalog.lookup("upperCase", "application/json");
888+
889+
function.setEnableSplitting(true);
890+
891+
List payload = List.of("2tsumo", "-", "hitori");
892+
893+
List result = (List) function.apply(MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build());
894+
895+
assertThat(((Collection) result)).isNotEmpty();
896+
assertThat(((Message) (result.get(0))).getPayload()).isEqualTo("\"2TSUMO\"".getBytes(StandardCharsets.UTF_8));
897+
assertThat(((Message) (result.get(1))).getPayload()).isEqualTo("\"-\"".getBytes(StandardCharsets.UTF_8));
898+
assertThat(((Message) (result.get(2))).getPayload()).isEqualTo("\"HITORI\"".getBytes(StandardCharsets.UTF_8));
899+
}
900+
901+
@SuppressWarnings("rawtypes")
902+
@Test
903+
void noUseBatchMessageSplitterTest() {
904+
FunctionCatalog functioncatalog = this.configureCatalog(BatchMessageSplitterConfiguration.class);
905+
FunctionInvocationWrapper function = functioncatalog.lookup("upperCase", "application/json");
906+
907+
function.setEnableSplitting(false);
908+
909+
List payload = List.of("2tsumo", "-", "hitori");
910+
911+
Message result = (Message) function.apply(MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build());
912+
assertThat(result.getPayload()).isEqualTo("[\"2TSUMO\",\"-\",\"HITORI\"]".getBytes(StandardCharsets.UTF_8));
913+
}
914+
883915
@EnableAutoConfiguration
884916
public static class PojoToMessageFunctionCompositionConfiguration {
885917

@@ -1608,4 +1640,14 @@ public Function<Message<?>, Message<?>> myFunction() {
16081640
return msg -> msg;
16091641
}
16101642
}
1643+
1644+
@EnableAutoConfiguration
1645+
@Configuration
1646+
protected static class BatchMessageSplitterConfiguration {
1647+
1648+
@Bean
1649+
public Function<Message<List<String>>, List<String>> upperCase() {
1650+
return messages -> messages.getPayload().stream().map(String::toUpperCase).toList();
1651+
}
1652+
}
16111653
}

0 commit comments

Comments
 (0)