diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index f51b7a01..23ca9a22 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -49,12 +49,14 @@ public abstract class AbstractTaskExecutor implements TaskEx private final Optional inputSchemaValidator; private final Optional outputSchemaValidator; private final Optional contextSchemaValidator; + private final Optional ifFilter; public abstract static class AbstractTaskExecutorBuilder implements TaskExecutorBuilder { private Optional inputProcessor = Optional.empty(); private Optional outputProcessor = Optional.empty(); private Optional contextProcessor = Optional.empty(); + private Optional ifFilter = Optional.empty(); private Optional inputSchemaValidator = Optional.empty(); private Optional outputSchemaValidator = Optional.empty(); private Optional contextSchemaValidator = Optional.empty(); @@ -100,6 +102,7 @@ protected AbstractTaskExecutorBuilder( this.contextSchemaValidator = getSchemaValidator(application.validatorFactory(), resourceLoader, export.getSchema()); } + this.ifFilter = optionalFilter(application.expressionFactory(), task.getIf()); } protected final TransitionInfoBuilder next( @@ -153,6 +156,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder builder) { this.inputSchemaValidator = builder.inputSchemaValidator; this.outputSchemaValidator = builder.outputSchemaValidator; this.contextSchemaValidator = builder.contextSchemaValidator; + this.ifFilter = builder.ifFilter; } protected final CompletableFuture executeNext( @@ -177,40 +181,49 @@ public CompletableFuture apply( if (!TaskExecutorHelper.isActive(workflowContext)) { return completable; } - return executeNext( - completable - .thenApply( - t -> { - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskStarted(position, task)); - inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); - inputProcessor.ifPresent( - p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); - return t; - }) - .thenCompose(t -> execute(workflowContext, t)) - .thenApply( - t -> { - outputProcessor.ifPresent( - p -> t.output(p.apply(workflowContext, t, t.rawOutput()))); - outputSchemaValidator.ifPresent(s -> s.validate(t.output())); - contextProcessor.ifPresent( - p -> - workflowContext.context( - p.apply(workflowContext, t, workflowContext.context()))); - contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); - t.completedAt(Instant.now()); - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskEnded(position, task)); - return t; - }), - workflowContext); + if (ifFilter + .map(f -> f.apply(workflowContext, taskContext, input).asBoolean(true)) + .orElse(true)) { + return executeNext( + completable + .thenApply( + t -> { + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskStarted(position, task)); + inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); + inputProcessor.ifPresent( + p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); + return t; + }) + .thenCompose(t -> execute(workflowContext, t)) + .thenApply( + t -> { + outputProcessor.ifPresent( + p -> t.output(p.apply(workflowContext, t, t.rawOutput()))); + outputSchemaValidator.ifPresent(s -> s.validate(t.output())); + contextProcessor.ifPresent( + p -> + workflowContext.context( + p.apply(workflowContext, t, workflowContext.context()))); + contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); + t.completedAt(Instant.now()); + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskEnded(position, task)); + return t; + }), + workflowContext); + } else { + taskContext.transition(getSkipTransition()); + return executeNext(completable, workflowContext); + } } + protected abstract TransitionInfo getSkipTransition(); + protected abstract CompletableFuture execute( WorkflowContext workflow, TaskContext taskContext); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java index 24c1e841..7cac9a8e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java @@ -54,6 +54,11 @@ public void connect(Map> connections) { } } + @Override + protected TransitionInfo getSkipTransition() { + return transition; + } + protected CompletableFuture execute( WorkflowContext workflow, TaskContext taskContext) { CompletableFuture future = diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java index 70b127c4..9bd3a74a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java @@ -79,6 +79,11 @@ protected TaskExecutor buildInstance() { } } + @Override + protected TransitionInfo getSkipTransition() { + return defaultTask; + } + private SwitchExecutor(SwitchExecutorBuilder builder) { super(builder); this.defaultTask = TransitionInfo.build(builder.defaultTask); diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index 4ea87283..4a0a073f 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -81,6 +81,14 @@ private static Stream provideParameters() { "simple-expression.yaml", Map.of("input", Arrays.asList(1, 2, 3)), WorkflowDefinitionTest::checkSpecialKeywords), + args( + "conditional-set.yaml", + Map.of("enabled", true), + WorkflowDefinitionTest::checkEnableCondition), + args( + "conditional-set.yaml", + Map.of("enabled", false), + WorkflowDefinitionTest::checkDisableCondition), args( "raise-inline copy.yaml", WorkflowDefinitionTest::checkWorkflowException, @@ -166,4 +174,14 @@ private static void checkSpecialKeywords(Object obj) { assertThat(result.get("id").toString()).hasSize(26); assertThat(result.get("version").toString()).contains("alpha"); } + + private static void checkEnableCondition(Object obj) { + Map result = (Map) obj; + assertThat(result.get("name")).isEqualTo("javierito"); + } + + private static void checkDisableCondition(Object obj) { + Map result = (Map) obj; + assertThat(result.get("enabled")).isEqualTo(false); + } } diff --git a/impl/core/src/test/resources/conditional-set.yaml b/impl/core/src/test/resources/conditional-set.yaml new file mode 100644 index 00000000..5e06622d --- /dev/null +++ b/impl/core/src/test/resources/conditional-set.yaml @@ -0,0 +1,10 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: conditional-set + version: '0.1.0' +do: + - conditionalExpression: + if: .enabled + set: + name: javierito \ No newline at end of file