Skip to content

Commit 74fc958

Browse files
committed
[Fix #484] Execute Fork task
Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 91bbe5c commit 74fc958

16 files changed

+473
-52
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.function.Supplier;
20+
21+
@FunctionalInterface
22+
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}

impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public String jsonPointer() {
5454

5555
@Override
5656
public String toString() {
57-
return "ListWorkflowPosition [list=" + queue + "]";
57+
return "QueueWorkflowPosition [queue=" + queue + "]";
5858
}
5959

6060
@Override
@@ -65,6 +65,6 @@ public WorkflowPosition back() {
6565

6666
@Override
6767
public Object last() {
68-
return queue.pollLast();
68+
return queue.getLast();
6969
}
7070
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

+54-17
Original file line numberDiff line numberDiff line change
@@ -28,35 +28,64 @@ public class TaskContext<T extends TaskBase> {
2828
private final JsonNode rawInput;
2929
private final T task;
3030
private final WorkflowPosition position;
31-
private final Instant startedAt = Instant.now();
31+
private final Instant startedAt;
3232

3333
private JsonNode input;
3434
private JsonNode output;
3535
private JsonNode rawOutput;
3636
private FlowDirective flowDirective;
3737
private Map<String, Object> contextVariables;
38+
private Instant completedAt;
3839

3940
public TaskContext(JsonNode input, WorkflowPosition position) {
40-
this.rawInput = input;
41-
this.position = position;
42-
this.task = null;
43-
this.contextVariables = new HashMap<>();
44-
init();
41+
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
4542
}
4643

47-
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
48-
this.rawInput = input;
49-
this.position = taskContext.position.copy();
50-
this.task = task;
51-
this.flowDirective = task.getThen();
52-
this.contextVariables = new HashMap<>(taskContext.variables());
53-
init();
44+
public TaskContext<T> copy() {
45+
return new TaskContext<T>(
46+
rawInput,
47+
task,
48+
position.copy(),
49+
startedAt,
50+
input,
51+
output,
52+
rawOutput,
53+
flowDirective,
54+
new HashMap<>(contextVariables));
5455
}
5556

56-
private void init() {
57-
this.input = rawInput;
58-
this.rawOutput = rawInput;
59-
this.output = rawInput;
57+
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
58+
this(
59+
input,
60+
task,
61+
taskContext.position,
62+
Instant.now(),
63+
input,
64+
input,
65+
input,
66+
task.getThen(),
67+
new HashMap<>(taskContext.variables()));
68+
}
69+
70+
private TaskContext(
71+
JsonNode rawInput,
72+
T task,
73+
WorkflowPosition position,
74+
Instant startedAt,
75+
JsonNode input,
76+
JsonNode output,
77+
JsonNode rawOutput,
78+
FlowDirective flowDirective,
79+
Map<String, Object> contextVariables) {
80+
this.rawInput = rawInput;
81+
this.task = task;
82+
this.position = position;
83+
this.startedAt = startedAt;
84+
this.input = input;
85+
this.output = output;
86+
this.rawOutput = rawOutput;
87+
this.flowDirective = flowDirective;
88+
this.contextVariables = contextVariables;
6089
}
6190

6291
public void input(JsonNode input) {
@@ -115,4 +144,12 @@ public WorkflowPosition position() {
115144
public Instant startedAt() {
116145
return startedAt;
117146
}
147+
148+
public void completedAt(Instant instant) {
149+
this.completedAt = instant;
150+
}
151+
152+
public Instant completedAt() {
153+
return completedAt;
154+
}
118155
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

+23
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.HashSet;
3333
import java.util.Map;
3434
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
3537

3638
public class WorkflowApplication implements AutoCloseable {
3739

@@ -43,8 +45,11 @@ public class WorkflowApplication implements AutoCloseable {
4345
private final Collection<WorkflowExecutionListener> listeners;
4446
private final Map<WorkflowId, WorkflowDefinition> definitions;
4547
private final WorkflowPositionFactory positionFactory;
48+
private final ExecutorServiceFactory executorFactory;
4649
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
4750

51+
private ExecutorService executorService;
52+
4853
public WorkflowApplication(
4954
TaskExecutorFactory taskFactory,
5055
ExpressionFactory exprFactory,
@@ -53,6 +58,7 @@ public WorkflowApplication(
5358
WorkflowPositionFactory positionFactory,
5459
WorkflowIdFactory idFactory,
5560
RuntimeDescriptorFactory runtimeDescriptorFactory,
61+
ExecutorServiceFactory executorFactory,
5662
Collection<WorkflowExecutionListener> listeners) {
5763
this.taskFactory = taskFactory;
5864
this.exprFactory = exprFactory;
@@ -61,6 +67,7 @@ public WorkflowApplication(
6167
this.positionFactory = positionFactory;
6268
this.idFactory = idFactory;
6369
this.runtimeDescriptorFactory = runtimeDescriptorFactory;
70+
this.executorFactory = executorFactory;
6471
this.listeners = listeners;
6572
this.definitions = new ConcurrentHashMap<>();
6673
}
@@ -101,6 +108,7 @@ public static class Builder {
101108
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();
102109
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
103110
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
111+
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
104112
private RuntimeDescriptorFactory descriptorFactory =
105113
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
106114

@@ -129,6 +137,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
129137
return this;
130138
}
131139

140+
public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
141+
this.executorFactory = executorFactory;
142+
return this;
143+
}
144+
132145
public Builder withPositionFactory(WorkflowPositionFactory positionFactory) {
133146
this.positionFactory = positionFactory;
134147
return this;
@@ -158,6 +171,7 @@ public WorkflowApplication build() {
158171
positionFactory,
159172
idFactory,
160173
descriptorFactory,
174+
executorFactory,
161175
listeners == null
162176
? Collections.emptySet()
163177
: Collections.unmodifiableCollection(listeners));
@@ -190,4 +204,13 @@ public WorkflowPositionFactory positionFactory() {
190204
public RuntimeDescriptorFactory runtimeDescriptorFactory() {
191205
return runtimeDescriptorFactory;
192206
}
207+
208+
public ExecutorService executorService() {
209+
synchronized (executorFactory) {
210+
if (executorService == null) {
211+
executorService = executorFactory.get();
212+
}
213+
}
214+
return executorService;
215+
}
193216
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Optional;
3535
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ExecutorService;
3637

3738
public class WorkflowDefinition implements AutoCloseable {
3839

@@ -134,6 +135,10 @@ public WorkflowPositionFactory positionFactory() {
134135
return application.positionFactory();
135136
}
136137

138+
public ExecutorService executorService() {
139+
return application.executorService();
140+
}
141+
137142
public RuntimeDescriptorFactory runtimeDescriptorFactory() {
138143
return application.runtimeDescriptorFactory();
139144
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
import io.serverlessworkflow.api.types.Task;
18+
import io.serverlessworkflow.api.types.TaskBase;
1919

2020
public interface WorkflowExecutionListener {
2121

22-
void onTaskStarted(WorkflowPosition currentPos, Task task);
22+
void onTaskStarted(WorkflowPosition currentPos, TaskBase task);
2323

24-
void onTaskEnded(WorkflowPosition currentPos, Task task);
24+
void onTaskEnded(WorkflowPosition currentPos, TaskBase task);
2525
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.fasterxml.jackson.databind.node.NullNode;
2222
import java.time.Instant;
23+
import java.util.concurrent.atomic.AtomicReference;
2324

2425
public class WorkflowInstance {
25-
private WorkflowState state;
26-
private TaskContext<?> taskContext;
26+
private final AtomicReference<WorkflowState> state;
27+
private final TaskContext<?> taskContext;
2728
private final String id;
2829
private final JsonNode input;
2930
private final Instant startedAt;
30-
private JsonNode context = NullNode.getInstance();
31+
private final AtomicReference<JsonNode> context;
3132

3233
WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
3334
this.id = definition.idFactory().get();
@@ -39,7 +40,8 @@ public class WorkflowInstance {
3940
definition
4041
.inputFilter()
4142
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
42-
state = WorkflowState.STARTED;
43+
state = new AtomicReference<>(WorkflowState.STARTED);
44+
context = new AtomicReference<>(NullNode.getInstance());
4345
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
4446
definition
4547
.outputFilter()
@@ -62,22 +64,26 @@ public JsonNode input() {
6264
}
6365

6466
public JsonNode context() {
65-
return context;
67+
return context.get();
6668
}
6769

6870
public WorkflowState state() {
69-
return state;
71+
return state.get();
72+
}
73+
74+
public void state(WorkflowState state) {
75+
this.state.set(state);
7076
}
7177

7278
public Object output() {
7379
return toJavaValue(taskContext.output());
7480
}
7581

76-
public Object outputAsJsonNode() {
82+
public JsonNode outputAsJsonNode() {
7783
return taskContext.output();
7884
}
7985

8086
void context(JsonNode context) {
81-
this.context = context;
87+
this.context.set(context);
8288
}
8389
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

+22-21
Original file line numberDiff line numberDiff line change
@@ -153,48 +153,49 @@ public static void processTaskList(
153153
TaskItem nextTask = iter.next();
154154
while (nextTask != null) {
155155
TaskItem task = nextTask;
156-
parentTask.position().addIndex(iter.previousIndex()).addProperty(task.getName());
157-
context
158-
.definition()
159-
.listeners()
160-
.forEach(l -> l.onTaskStarted(parentTask.position(), task.getTask()));
161-
currentContext =
162-
context
163-
.definition()
164-
.taskExecutors()
165-
.computeIfAbsent(
166-
parentTask.position().jsonPointer(),
167-
k ->
168-
context
169-
.definition()
170-
.taskFactory()
171-
.getTaskExecutor(task.getTask(), context.definition()))
172-
.apply(context, parentTask, currentContext.output());
156+
parentTask.position().addIndex(iter.previousIndex());
157+
currentContext = executeTask(context, parentTask, task, currentContext.output());
173158
FlowDirective flowDirective = currentContext.flowDirective();
174159
if (flowDirective.getFlowDirectiveEnum() != null) {
175160
switch (flowDirective.getFlowDirectiveEnum()) {
176161
case CONTINUE:
177162
nextTask = iter.hasNext() ? iter.next() : null;
178163
break;
179164
case END:
165+
context.instance().state(WorkflowState.COMPLETED);
180166
case EXIT:
181167
nextTask = null;
182168
break;
183169
}
184170
} else {
185171
nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString());
186172
}
187-
context
188-
.definition()
189-
.listeners()
190-
.forEach(l -> l.onTaskEnded(parentTask.position(), task.getTask()));
191173
parentTask.position().back();
192174
}
193175
}
194176
parentTask.position().back();
195177
parentTask.rawOutput(currentContext.output());
196178
}
197179

180+
public static TaskContext<?> executeTask(
181+
WorkflowContext context, TaskContext<?> parentTask, TaskItem task, JsonNode input) {
182+
parentTask.position().addProperty(task.getName());
183+
TaskContext<?> result =
184+
context
185+
.definition()
186+
.taskExecutors()
187+
.computeIfAbsent(
188+
parentTask.position().jsonPointer(),
189+
k ->
190+
context
191+
.definition()
192+
.taskFactory()
193+
.getTaskExecutor(task.getTask(), context.definition()))
194+
.apply(context, parentTask, input);
195+
parentTask.position().back();
196+
return result;
197+
}
198+
198199
public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {
199200
assert str != null;
200201
Expression expression = exprFactory.getExpression(str);

0 commit comments

Comments
 (0)