From 00b9ea518d669aae71fc02a5b4cdba3acf0f1d6d Mon Sep 17 00:00:00 2001 From: 2tsumo-hitori Date: Sun, 9 Mar 2025 13:37:58 +0900 Subject: [PATCH] Add batch message splitting and unit tests - Added a feature that transforms a Message containing a Collection payload into individual Message objects. - This enables easier processing of batch messages in frameworks like Spring Cloud Stream. - Introduced `enableSplitting` boolean variable in `FunctionInvocationWrapper` to control message splitting behavior. - Made `enableSplitting` configurable at initialization, ensuring that the existing behavior remains unaffected if not set. Signed-off-by: 2tsumo-hitori --- .../catalog/SimpleFunctionRegistry.java | 17 ++++++++ ...BeanFactoryAwareFunctionRegistryTests.java | 42 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index cdf6380af..650a27643 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -87,6 +87,7 @@ * @author Roman Samarev * @author Soby Chacko * @author Chris Bono + * @author 2tsumo-hitori */ public class SimpleFunctionRegistry implements FunctionRegistry { protected Log logger = LogFactory.getLog(this.getClass()); @@ -419,6 +420,8 @@ public class FunctionInvocationWrapper implements Function, Cons protected boolean wrapped; + private boolean enableSplitting = false; + private final ThreadLocal> unconvertedResult = new ThreadLocal<>(); private PostProcessingFunction postProcessor; @@ -575,6 +578,14 @@ public Class getRawInputType() { return this.inputType == null ? null : FunctionTypeUtils.getRawType(this.inputType); } + public boolean isEnableSplitting() { + return enableSplitting; + } + + public void setEnableSplitting(boolean enableSplitting) { + this.enableSplitting = enableSplitting; + } + /** * */ @@ -815,6 +826,9 @@ private Object enrichInvocationResultIfNecessary(Object input, Object result) { if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) { result = functionInvocationHelper.postProcessResult(result, (Message) input); } + else if (this.enableSplitting && !FunctionTypeUtils.isCollectionOfMessage(this.outputType)) { + result = ((Collection) result).stream().map(it -> MessageBuilder.withPayload(it).copyHeaders(sanitizeHeaders(((Message) input).getHeaders())).build()).toList(); + } else if (!FunctionTypeUtils.isCollectionOfMessage(this.outputType)) { result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); } @@ -1266,6 +1280,9 @@ else if (convertedOutput instanceof Message) { else if (convertedOutput instanceof Collection && this.isOutputTypeMessage()) { convertedOutput = this.convertMultipleOutputValuesIfNecessary(convertedOutput, ObjectUtils.isEmpty(contentType) ? null : contentType); } + else if (convertedOutput instanceof Collection && this.isEnableSplitting()) { + convertedOutput = this.convertMultipleOutputValuesIfNecessary(convertedOutput, ObjectUtils.isEmpty(contentType) ? null : contentType); + } else if (ObjectUtils.isArray(convertedOutput) && !(convertedOutput instanceof byte[])) { convertedOutput = this.convertMultipleOutputValuesIfNecessary(convertedOutput, ObjectUtils.isEmpty(contentType) ? null : contentType); } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 24c07e4fc..d63972eae 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -880,6 +880,38 @@ public void testConcurrencyOnLookup() throws Exception { assertThat(counter.get()).isEqualTo(100); } + @SuppressWarnings("rawtypes") + @Test + void useBatchMessageSplitterTest() { + FunctionCatalog functionCatalog = this.configureCatalog(BatchMessageSplitterConfiguration.class); + FunctionInvocationWrapper function = functionCatalog.lookup("upperCase", "application/json"); + + function.setEnableSplitting(true); + + List payload = List.of("2tsumo", "-", "hitori"); + + List result = (List) function.apply(MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build()); + + assertThat(((Collection) result)).isNotEmpty(); + assertThat(((Message) (result.get(0))).getPayload()).isEqualTo("\"2TSUMO\"".getBytes(StandardCharsets.UTF_8)); + assertThat(((Message) (result.get(1))).getPayload()).isEqualTo("\"-\"".getBytes(StandardCharsets.UTF_8)); + assertThat(((Message) (result.get(2))).getPayload()).isEqualTo("\"HITORI\"".getBytes(StandardCharsets.UTF_8)); + } + + @SuppressWarnings("rawtypes") + @Test + void noUseBatchMessageSplitterTest() { + FunctionCatalog functioncatalog = this.configureCatalog(BatchMessageSplitterConfiguration.class); + FunctionInvocationWrapper function = functioncatalog.lookup("upperCase", "application/json"); + + function.setEnableSplitting(false); + + List payload = List.of("2tsumo", "-", "hitori"); + + Message result = (Message) function.apply(MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build()); + assertThat(result.getPayload()).isEqualTo("[\"2TSUMO\",\"-\",\"HITORI\"]".getBytes(StandardCharsets.UTF_8)); + } + @EnableAutoConfiguration public static class PojoToMessageFunctionCompositionConfiguration { @@ -1608,4 +1640,14 @@ public Function, Message> myFunction() { return msg -> msg; } } + + @EnableAutoConfiguration + @Configuration + protected static class BatchMessageSplitterConfiguration { + + @Bean + public Function>, List> upperCase() { + return messages -> messages.getPayload().stream().map(String::toUpperCase).toList(); + } + } }