Skip to content

Commit a59fe58

Browse files
authored
Merge pull request #489 from fjtirado/Fix_#484
[Fix #484] Execute Fork task
2 parents 91bbe5c + 202d485 commit a59fe58

24 files changed

+620
-155
lines changed

impl/core/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,10 @@
5050
<artifactId>assertj-core</artifactId>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>ch.qos.logback</groupId>
55+
<artifactId>logback-classic</artifactId>
56+
<scope>test</scope>
57+
</dependency>
5358
</dependencies>
5459
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.function.Supplier;
20+
21+
@FunctionalInterface
22+
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.function.BiFunction;
19+
20+
@FunctionalInterface
21+
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}

impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public String jsonPointer() {
5454

5555
@Override
5656
public String toString() {
57-
return "ListWorkflowPosition [list=" + queue + "]";
57+
return "QueueWorkflowPosition [queue=" + queue + "]";
5858
}
5959

6060
@Override
@@ -65,6 +65,6 @@ public WorkflowPosition back() {
6565

6666
@Override
6767
public Object last() {
68-
return queue.pollLast();
68+
return queue.getLast();
6969
}
7070
}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

+52-15
Original file line numberDiff line numberDiff line change
@@ -28,35 +28,64 @@ public class TaskContext<T extends TaskBase> {
2828
private final JsonNode rawInput;
2929
private final T task;
3030
private final WorkflowPosition position;
31-
private final Instant startedAt = Instant.now();
31+
private final Instant startedAt;
3232

3333
private JsonNode input;
3434
private JsonNode output;
3535
private JsonNode rawOutput;
3636
private FlowDirective flowDirective;
3737
private Map<String, Object> contextVariables;
38+
private Instant completedAt;
3839

3940
public TaskContext(JsonNode input, WorkflowPosition position) {
40-
this.rawInput = input;
41-
this.position = position;
42-
this.task = null;
43-
this.contextVariables = new HashMap<>();
44-
init();
41+
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
4542
}
4643

4744
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
48-
this.rawInput = input;
49-
this.position = taskContext.position.copy();
45+
this(
46+
input,
47+
task,
48+
taskContext.position,
49+
Instant.now(),
50+
input,
51+
input,
52+
input,
53+
task.getThen(),
54+
new HashMap<>(taskContext.variables()));
55+
}
56+
57+
private TaskContext(
58+
JsonNode rawInput,
59+
T task,
60+
WorkflowPosition position,
61+
Instant startedAt,
62+
JsonNode input,
63+
JsonNode output,
64+
JsonNode rawOutput,
65+
FlowDirective flowDirective,
66+
Map<String, Object> contextVariables) {
67+
this.rawInput = rawInput;
5068
this.task = task;
51-
this.flowDirective = task.getThen();
52-
this.contextVariables = new HashMap<>(taskContext.variables());
53-
init();
69+
this.position = position;
70+
this.startedAt = startedAt;
71+
this.input = input;
72+
this.output = output;
73+
this.rawOutput = rawOutput;
74+
this.flowDirective = flowDirective;
75+
this.contextVariables = contextVariables;
5476
}
5577

56-
private void init() {
57-
this.input = rawInput;
58-
this.rawOutput = rawInput;
59-
this.output = rawInput;
78+
public TaskContext<T> copy() {
79+
return new TaskContext<T>(
80+
rawInput,
81+
task,
82+
position.copy(),
83+
startedAt,
84+
input,
85+
output,
86+
rawOutput,
87+
flowDirective,
88+
new HashMap<>(contextVariables));
6089
}
6190

6291
public void input(JsonNode input) {
@@ -115,4 +144,12 @@ public WorkflowPosition position() {
115144
public Instant startedAt() {
116145
return startedAt;
117146
}
147+
148+
public void completedAt(Instant instant) {
149+
this.completedAt = instant;
150+
}
151+
152+
public Instant completedAt() {
153+
return completedAt;
154+
}
118155
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

+23
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.HashSet;
3333
import java.util.Map;
3434
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
3537

3638
public class WorkflowApplication implements AutoCloseable {
3739

@@ -43,8 +45,11 @@ public class WorkflowApplication implements AutoCloseable {
4345
private final Collection<WorkflowExecutionListener> listeners;
4446
private final Map<WorkflowId, WorkflowDefinition> definitions;
4547
private final WorkflowPositionFactory positionFactory;
48+
private final ExecutorServiceFactory executorFactory;
4649
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
4750

51+
private ExecutorService executorService;
52+
4853
public WorkflowApplication(
4954
TaskExecutorFactory taskFactory,
5055
ExpressionFactory exprFactory,
@@ -53,6 +58,7 @@ public WorkflowApplication(
5358
WorkflowPositionFactory positionFactory,
5459
WorkflowIdFactory idFactory,
5560
RuntimeDescriptorFactory runtimeDescriptorFactory,
61+
ExecutorServiceFactory executorFactory,
5662
Collection<WorkflowExecutionListener> listeners) {
5763
this.taskFactory = taskFactory;
5864
this.exprFactory = exprFactory;
@@ -61,6 +67,7 @@ public WorkflowApplication(
6167
this.positionFactory = positionFactory;
6268
this.idFactory = idFactory;
6369
this.runtimeDescriptorFactory = runtimeDescriptorFactory;
70+
this.executorFactory = executorFactory;
6471
this.listeners = listeners;
6572
this.definitions = new ConcurrentHashMap<>();
6673
}
@@ -101,6 +108,7 @@ public static class Builder {
101108
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();
102109
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
103110
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
111+
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
104112
private RuntimeDescriptorFactory descriptorFactory =
105113
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
106114

@@ -129,6 +137,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
129137
return this;
130138
}
131139

140+
public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
141+
this.executorFactory = executorFactory;
142+
return this;
143+
}
144+
132145
public Builder withPositionFactory(WorkflowPositionFactory positionFactory) {
133146
this.positionFactory = positionFactory;
134147
return this;
@@ -158,6 +171,7 @@ public WorkflowApplication build() {
158171
positionFactory,
159172
idFactory,
160173
descriptorFactory,
174+
executorFactory,
161175
listeners == null
162176
? Collections.emptySet()
163177
: Collections.unmodifiableCollection(listeners));
@@ -190,4 +204,13 @@ public WorkflowPositionFactory positionFactory() {
190204
public RuntimeDescriptorFactory runtimeDescriptorFactory() {
191205
return runtimeDescriptorFactory;
192206
}
207+
208+
public ExecutorService executorService() {
209+
synchronized (executorFactory) {
210+
if (executorService == null) {
211+
executorService = executorFactory.get();
212+
}
213+
}
214+
return executorService;
215+
}
193216
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Optional;
3535
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ExecutorService;
3637

3738
public class WorkflowDefinition implements AutoCloseable {
3839

@@ -48,22 +49,19 @@ public class WorkflowDefinition implements AutoCloseable {
4849

4950
private WorkflowDefinition(
5051
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
51-
5252
this.workflow = workflow;
5353
this.application = application;
5454
this.resourceLoader = resourceLoader;
5555
if (workflow.getInput() != null) {
5656
Input input = workflow.getInput();
5757
this.inputSchemaValidator =
58-
getSchemaValidator(
59-
application.validatorFactory(), schemaToNode(resourceLoader, input.getSchema()));
58+
getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema());
6059
this.inputFilter = buildWorkflowFilter(application.expressionFactory(), input.getFrom());
6160
}
6261
if (workflow.getOutput() != null) {
6362
Output output = workflow.getOutput();
6463
this.outputSchemaValidator =
65-
getSchemaValidator(
66-
application.validatorFactory(), schemaToNode(resourceLoader, output.getSchema()));
64+
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
6765
this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs());
6866
}
6967
}
@@ -134,6 +132,10 @@ public WorkflowPositionFactory positionFactory() {
134132
return application.positionFactory();
135133
}
136134

135+
public ExecutorService executorService() {
136+
return application.executorService();
137+
}
138+
137139
public RuntimeDescriptorFactory runtimeDescriptorFactory() {
138140
return application.runtimeDescriptorFactory();
139141
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
import io.serverlessworkflow.api.types.Task;
18+
import io.serverlessworkflow.api.types.TaskBase;
1919

2020
public interface WorkflowExecutionListener {
2121

22-
void onTaskStarted(WorkflowPosition currentPos, Task task);
22+
void onTaskStarted(WorkflowPosition currentPos, TaskBase task);
2323

24-
void onTaskEnded(WorkflowPosition currentPos, Task task);
24+
void onTaskEnded(WorkflowPosition currentPos, TaskBase task);
2525
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919

2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.fasterxml.jackson.databind.node.NullNode;
22+
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2223
import java.time.Instant;
24+
import java.util.concurrent.atomic.AtomicReference;
2325

2426
public class WorkflowInstance {
25-
private WorkflowState state;
26-
private TaskContext<?> taskContext;
27+
private final AtomicReference<WorkflowStatus> status;
28+
private final TaskContext<?> taskContext;
2729
private final String id;
2830
private final JsonNode input;
2931
private final Instant startedAt;
30-
private JsonNode context = NullNode.getInstance();
32+
private final AtomicReference<JsonNode> context;
3133

3234
WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
3335
this.id = definition.idFactory().get();
@@ -39,14 +41,16 @@ public class WorkflowInstance {
3941
definition
4042
.inputFilter()
4143
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
42-
state = WorkflowState.STARTED;
43-
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
44+
status = new AtomicReference<>(WorkflowStatus.RUNNING);
45+
context = new AtomicReference<>(NullNode.getInstance());
46+
TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
4447
definition
4548
.outputFilter()
4649
.ifPresent(
4750
f ->
4851
taskContext.output(f.apply(workflowContext, taskContext, taskContext.rawOutput())));
4952
definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output()));
53+
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
5054
}
5155

5256
public String id() {
@@ -62,22 +66,26 @@ public JsonNode input() {
6266
}
6367

6468
public JsonNode context() {
65-
return context;
69+
return context.get();
6670
}
6771

68-
public WorkflowState state() {
69-
return state;
72+
public WorkflowStatus status() {
73+
return status.get();
74+
}
75+
76+
public void status(WorkflowStatus state) {
77+
this.status.set(state);
7078
}
7179

7280
public Object output() {
7381
return toJavaValue(taskContext.output());
7482
}
7583

76-
public Object outputAsJsonNode() {
84+
public JsonNode outputAsJsonNode() {
7785
return taskContext.output();
7886
}
7987

8088
void context(JsonNode context) {
81-
this.context = context;
89+
this.context.set(context);
8290
}
8391
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
public enum WorkflowState {
19-
STARTED,
18+
public enum WorkflowStatus {
19+
PENDING,
20+
RUNNING,
2021
WAITING,
21-
COMPLETED
22+
COMPLETED,
23+
FAULTED,
24+
CANCELLED
2225
}

0 commit comments

Comments
 (0)