Skip to content

Commit 202d485

Browse files
committed
[Fix #484] Status related changes
Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 74fc958 commit 202d485

22 files changed

+364
-320
lines changed

impl/core/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,10 @@
5050
<artifactId>assertj-core</artifactId>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>ch.qos.logback</groupId>
55+
<artifactId>logback-classic</artifactId>
56+
<scope>test</scope>
57+
</dependency>
5358
</dependencies>
5459
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.function.BiFunction;
19+
20+
@FunctionalInterface
21+
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,6 @@ public TaskContext(JsonNode input, WorkflowPosition position) {
4141
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
4242
}
4343

44-
public TaskContext<T> copy() {
45-
return new TaskContext<T>(
46-
rawInput,
47-
task,
48-
position.copy(),
49-
startedAt,
50-
input,
51-
output,
52-
rawOutput,
53-
flowDirective,
54-
new HashMap<>(contextVariables));
55-
}
56-
5744
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
5845
this(
5946
input,
@@ -88,6 +75,19 @@ private TaskContext(
8875
this.contextVariables = contextVariables;
8976
}
9077

78+
public TaskContext<T> copy() {
79+
return new TaskContext<T>(
80+
rawInput,
81+
task,
82+
position.copy(),
83+
startedAt,
84+
input,
85+
output,
86+
rawOutput,
87+
flowDirective,
88+
new HashMap<>(contextVariables));
89+
}
90+
9191
public void input(JsonNode input) {
9292
this.input = input;
9393
this.rawOutput = input;

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,19 @@ public class WorkflowDefinition implements AutoCloseable {
4949

5050
private WorkflowDefinition(
5151
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
52-
5352
this.workflow = workflow;
5453
this.application = application;
5554
this.resourceLoader = resourceLoader;
5655
if (workflow.getInput() != null) {
5756
Input input = workflow.getInput();
5857
this.inputSchemaValidator =
59-
getSchemaValidator(
60-
application.validatorFactory(), schemaToNode(resourceLoader, input.getSchema()));
58+
getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema());
6159
this.inputFilter = buildWorkflowFilter(application.expressionFactory(), input.getFrom());
6260
}
6361
if (workflow.getOutput() != null) {
6462
Output output = workflow.getOutput();
6563
this.outputSchemaValidator =
66-
getSchemaValidator(
67-
application.validatorFactory(), schemaToNode(resourceLoader, output.getSchema()));
64+
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
6865
this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs());
6966
}
7067
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.fasterxml.jackson.databind.node.NullNode;
22+
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2223
import java.time.Instant;
2324
import java.util.concurrent.atomic.AtomicReference;
2425

2526
public class WorkflowInstance {
26-
private final AtomicReference<WorkflowState> state;
27+
private final AtomicReference<WorkflowStatus> status;
2728
private final TaskContext<?> taskContext;
2829
private final String id;
2930
private final JsonNode input;
@@ -40,15 +41,16 @@ public class WorkflowInstance {
4041
definition
4142
.inputFilter()
4243
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
43-
state = new AtomicReference<>(WorkflowState.STARTED);
44+
status = new AtomicReference<>(WorkflowStatus.RUNNING);
4445
context = new AtomicReference<>(NullNode.getInstance());
45-
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
46+
TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
4647
definition
4748
.outputFilter()
4849
.ifPresent(
4950
f ->
5051
taskContext.output(f.apply(workflowContext, taskContext, taskContext.rawOutput())));
5152
definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output()));
53+
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
5254
}
5355

5456
public String id() {
@@ -67,12 +69,12 @@ public JsonNode context() {
6769
return context.get();
6870
}
6971

70-
public WorkflowState state() {
71-
return state.get();
72+
public WorkflowStatus status() {
73+
return status.get();
7274
}
7375

74-
public void state(WorkflowState state) {
75-
this.state.set(state);
76+
public void status(WorkflowStatus state) {
77+
this.status.set(state);
7678
}
7779

7880
public Object output() {

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
public enum WorkflowState {
19-
STARTED,
18+
public enum WorkflowStatus {
19+
PENDING,
20+
RUNNING,
2021
WAITING,
21-
COMPLETED
22+
COMPLETED,
23+
FAULTED,
24+
CANCELLED
2225
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

+21-78
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
import com.fasterxml.jackson.databind.ObjectMapper;
2020
import io.serverlessworkflow.api.WorkflowFormat;
2121
import io.serverlessworkflow.api.types.ExportAs;
22-
import io.serverlessworkflow.api.types.FlowDirective;
2322
import io.serverlessworkflow.api.types.InputFrom;
2423
import io.serverlessworkflow.api.types.OutputAs;
2524
import io.serverlessworkflow.api.types.SchemaExternal;
2625
import io.serverlessworkflow.api.types.SchemaInline;
2726
import io.serverlessworkflow.api.types.SchemaUnion;
28-
import io.serverlessworkflow.api.types.TaskBase;
29-
import io.serverlessworkflow.api.types.TaskItem;
3027
import io.serverlessworkflow.impl.expressions.Expression;
3128
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
3229
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
@@ -38,8 +35,6 @@
3835
import java.io.IOException;
3936
import java.io.InputStream;
4037
import java.io.UncheckedIOException;
41-
import java.util.List;
42-
import java.util.ListIterator;
4338
import java.util.Map;
4439
import java.util.Optional;
4540

@@ -48,11 +43,12 @@ public class WorkflowUtils {
4843
private WorkflowUtils() {}
4944

5045
public static Optional<SchemaValidator> getSchemaValidator(
51-
SchemaValidatorFactory validatorFactory, Optional<JsonNode> node) {
52-
return node.map(n -> validatorFactory.getValidator(n));
46+
SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) {
47+
return schemaToNode(resourceLoader, schema).map(n -> validatorFactory.getValidator(n));
5348
}
5449

55-
public static Optional<JsonNode> schemaToNode(ResourceLoader resourceLoader, SchemaUnion schema) {
50+
private static Optional<JsonNode> schemaToNode(
51+
ResourceLoader resourceLoader, SchemaUnion schema) {
5652
if (schema != null) {
5753
if (schema.getSchemaInline() != null) {
5854
SchemaInline inline = schema.getSchemaInline();
@@ -94,18 +90,22 @@ public static Optional<WorkflowFilter> buildWorkflowFilter(
9490

9591
public static StringFilter buildStringFilter(
9692
ExpressionFactory exprFactory, String expression, String literal) {
97-
return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal);
93+
return expression != null
94+
? toString(buildWorkflowFilter(exprFactory, expression))
95+
: toString(literal);
9896
}
9997

10098
public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) {
101-
return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str);
99+
return ExpressionUtils.isExpr(str)
100+
? toString(buildWorkflowFilter(exprFactory, str))
101+
: toString(str);
102102
}
103103

104-
public static StringFilter from(WorkflowFilter filter) {
104+
private static StringFilter toString(WorkflowFilter filter) {
105105
return (w, t) -> filter.apply(w, t, t.input()).asText();
106106
}
107107

108-
private static StringFilter from(String literal) {
108+
private static StringFilter toString(String literal) {
109109
return (w, t) -> literal;
110110
}
111111

@@ -124,76 +124,19 @@ private static WorkflowFilter buildWorkflowFilter(
124124
throw new IllegalStateException("Both object and str are null");
125125
}
126126

127-
private static TaskItem findTaskByName(ListIterator<TaskItem> iter, String taskName) {
128-
int currentIndex = iter.nextIndex();
129-
while (iter.hasPrevious()) {
130-
TaskItem item = iter.previous();
131-
if (item.getName().equals(taskName)) {
132-
return item;
133-
}
134-
}
135-
while (iter.nextIndex() < currentIndex) {
136-
iter.next();
137-
}
138-
while (iter.hasNext()) {
139-
TaskItem item = iter.next();
140-
if (item.getName().equals(taskName)) {
141-
return item;
142-
}
143-
}
144-
throw new IllegalArgumentException("Cannot find task with name " + taskName);
127+
public static LongFilter buildLongFilter(
128+
ExpressionFactory exprFactory, String expression, Long literal) {
129+
return expression != null
130+
? toLong(buildWorkflowFilter(exprFactory, expression))
131+
: toLong(literal);
145132
}
146133

147-
public static void processTaskList(
148-
List<TaskItem> tasks, WorkflowContext context, TaskContext<?> parentTask) {
149-
parentTask.position().addProperty("do");
150-
TaskContext<? extends TaskBase> currentContext = parentTask;
151-
if (!tasks.isEmpty()) {
152-
ListIterator<TaskItem> iter = tasks.listIterator();
153-
TaskItem nextTask = iter.next();
154-
while (nextTask != null) {
155-
TaskItem task = nextTask;
156-
parentTask.position().addIndex(iter.previousIndex());
157-
currentContext = executeTask(context, parentTask, task, currentContext.output());
158-
FlowDirective flowDirective = currentContext.flowDirective();
159-
if (flowDirective.getFlowDirectiveEnum() != null) {
160-
switch (flowDirective.getFlowDirectiveEnum()) {
161-
case CONTINUE:
162-
nextTask = iter.hasNext() ? iter.next() : null;
163-
break;
164-
case END:
165-
context.instance().state(WorkflowState.COMPLETED);
166-
case EXIT:
167-
nextTask = null;
168-
break;
169-
}
170-
} else {
171-
nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString());
172-
}
173-
parentTask.position().back();
174-
}
175-
}
176-
parentTask.position().back();
177-
parentTask.rawOutput(currentContext.output());
134+
private static LongFilter toLong(WorkflowFilter filter) {
135+
return (w, t) -> filter.apply(w, t, t.input()).asLong();
178136
}
179137

180-
public static TaskContext<?> executeTask(
181-
WorkflowContext context, TaskContext<?> parentTask, TaskItem task, JsonNode input) {
182-
parentTask.position().addProperty(task.getName());
183-
TaskContext<?> result =
184-
context
185-
.definition()
186-
.taskExecutors()
187-
.computeIfAbsent(
188-
parentTask.position().jsonPointer(),
189-
k ->
190-
context
191-
.definition()
192-
.taskFactory()
193-
.getTaskExecutor(task.getTask(), context.definition()))
194-
.apply(context, parentTask, input);
195-
parentTask.position().back();
196-
return result;
138+
private static LongFilter toLong(Long literal) {
139+
return (w, t) -> literal;
197140
}
198141

199142
public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {

0 commit comments

Comments
 (0)