Skip to content

Commit 23710ee

Browse files
Fix #238 - Add support to fork task (#240)
* Fix #238 - Add support to fork task Signed-off-by: Ricardo Zanini <[email protected]> * Adding missed headers Signed-off-by: Ricardo Zanini <[email protected]> * Fix linters, makefile, fmt Signed-off-by: Ricardo Zanini <[email protected]> * Fix Labeler CI Signed-off-by: Ricardo Zanini <[email protected]> * Remove labeler Signed-off-by: Ricardo Zanini <[email protected]> --------- Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 46481f6 commit 23710ee

18 files changed

+413
-137
lines changed

.github/workflows/pull_request_labeler.yml

Lines changed: 0 additions & 26 deletions
This file was deleted.

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ goimports:
1111
@goimports -w .
1212

1313
lint:
14-
@echo "🚀 Running lint..."
15-
@command -v golangci-lint > /dev/null || (echo "🚀 Installing golangci-lint..."; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b "${GOPATH}/bin")
14+
@echo "🚀 Installing/updating golangci-lint…"
15+
GO111MODULE=on go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
16+
17+
@echo "🚀 Running lint…"
1618
@make addheaders
1719
@make goimports
1820
@make fmt
19-
@./hack/go-lint.sh ${params}
21+
@$(GOPATH)/bin/golangci-lint run ./... ${params}
2022
@echo "✅ Linting completed!"
2123

2224
.PHONY: test

impl/ctx/context.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
"github.com/serverlessworkflow/sdk-go/v3/impl/utils"
26+
2527
"github.com/google/uuid"
2628
"github.com/serverlessworkflow/sdk-go/v3/model"
2729
)
@@ -71,6 +73,7 @@ type WorkflowContext interface {
7173
SetLocalExprVars(vars map[string]interface{})
7274
AddLocalExprVars(vars map[string]interface{})
7375
RemoveLocalExprVars(keys ...string)
76+
Clone() WorkflowContext
7477
}
7578

7679
// workflowContext holds the necessary data for the workflow execution within the instance.
@@ -118,6 +121,38 @@ func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) {
118121
return wfCtx, nil
119122
}
120123

124+
func (ctx *workflowContext) Clone() WorkflowContext {
125+
ctx.mu.Lock()
126+
defer ctx.mu.Unlock()
127+
128+
newInput := utils.DeepCloneValue(ctx.input)
129+
newOutput := utils.DeepCloneValue(ctx.output)
130+
131+
// deep clone each of the maps
132+
newContextMap := utils.DeepClone(ctx.context)
133+
newWorkflowDesc := utils.DeepClone(ctx.workflowDescriptor)
134+
newTaskDesc := utils.DeepClone(ctx.taskDescriptor)
135+
newLocalExprVars := utils.DeepClone(ctx.localExprVars)
136+
137+
newStatusPhase := append([]StatusPhaseLog(nil), ctx.StatusPhase...)
138+
139+
newTasksStatusPhase := make(map[string][]StatusPhaseLog, len(ctx.TasksStatusPhase))
140+
for taskName, logs := range ctx.TasksStatusPhase {
141+
newTasksStatusPhase[taskName] = append([]StatusPhaseLog(nil), logs...)
142+
}
143+
144+
return &workflowContext{
145+
input: newInput,
146+
output: newOutput,
147+
context: newContextMap,
148+
workflowDescriptor: newWorkflowDesc,
149+
taskDescriptor: newTaskDesc,
150+
localExprVars: newLocalExprVars,
151+
StatusPhase: newStatusPhase,
152+
TasksStatusPhase: newTasksStatusPhase,
153+
}
154+
}
155+
121156
func (ctx *workflowContext) SetStartedAt(t time.Time) {
122157
ctx.mu.Lock()
123158
defer ctx.mu.Unlock()

impl/expr/expr.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,28 @@ func mergeContextInVars(nodeCtx context.Context, variables map[string]interface{
132132

133133
return nil
134134
}
135+
136+
func TraverseAndEvaluateObj(runtimeExpr *model.ObjectOrRuntimeExpr, input interface{}, taskName string, wfCtx context.Context) (output interface{}, err error) {
137+
if runtimeExpr == nil {
138+
return input, nil
139+
}
140+
output, err = TraverseAndEvaluate(runtimeExpr.AsStringOrMap(), input, wfCtx)
141+
if err != nil {
142+
return nil, model.NewErrExpression(err, taskName)
143+
}
144+
return output, nil
145+
}
146+
147+
func TraverseAndEvaluateBool(runtimeExpr string, input interface{}, wfCtx context.Context) (bool, error) {
148+
if len(runtimeExpr) == 0 {
149+
return false, nil
150+
}
151+
output, err := TraverseAndEvaluate(runtimeExpr, input, wfCtx)
152+
if err != nil {
153+
return false, nil
154+
}
155+
if result, ok := output.(bool); ok {
156+
return result, nil
157+
}
158+
return false, nil
159+
}

impl/runner.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import (
1919
"fmt"
2020
"time"
2121

22+
"github.com/serverlessworkflow/sdk-go/v3/impl/expr"
23+
"github.com/serverlessworkflow/sdk-go/v3/impl/utils"
24+
2225
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2326
"github.com/serverlessworkflow/sdk-go/v3/model"
2427
)
@@ -53,6 +56,18 @@ type workflowRunnerImpl struct {
5356
RunnerCtx ctx.WorkflowContext
5457
}
5558

59+
func (wr *workflowRunnerImpl) CloneWithContext(newCtx context.Context) TaskSupport {
60+
clonedWfCtx := wr.RunnerCtx.Clone()
61+
62+
ctxWithWf := ctx.WithWorkflowContext(newCtx, clonedWfCtx)
63+
64+
return &workflowRunnerImpl{
65+
Workflow: wr.Workflow,
66+
Context: ctxWithWf,
67+
RunnerCtx: clonedWfCtx,
68+
}
69+
}
70+
5671
func (wr *workflowRunnerImpl) RemoveLocalExprVars(keys ...string) {
5772
wr.RunnerCtx.RemoveLocalExprVars(keys...)
5873
}
@@ -175,13 +190,13 @@ func (wr *workflowRunnerImpl) wrapWorkflowError(err error) error {
175190
func (wr *workflowRunnerImpl) processInput(input interface{}) (output interface{}, err error) {
176191
if wr.Workflow.Input != nil {
177192
if wr.Workflow.Input.Schema != nil {
178-
if err = validateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil {
193+
if err = utils.ValidateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil {
179194
return nil, err
180195
}
181196
}
182197

183198
if wr.Workflow.Input.From != nil {
184-
output, err = traverseAndEvaluate(wr.Workflow.Input.From, input, "/", wr.Context)
199+
output, err = expr.TraverseAndEvaluateObj(wr.Workflow.Input.From, input, "/", wr.Context)
185200
if err != nil {
186201
return nil, err
187202
}
@@ -196,13 +211,13 @@ func (wr *workflowRunnerImpl) processOutput(output interface{}) (interface{}, er
196211
if wr.Workflow.Output != nil {
197212
if wr.Workflow.Output.As != nil {
198213
var err error
199-
output, err = traverseAndEvaluate(wr.Workflow.Output.As, output, "/", wr.Context)
214+
output, err = expr.TraverseAndEvaluateObj(wr.Workflow.Output.As, output, "/", wr.Context)
200215
if err != nil {
201216
return nil, err
202217
}
203218
}
204219
if wr.Workflow.Output.Schema != nil {
205-
if err := validateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil {
220+
if err := utils.ValidateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil {
206221
return nil, err
207222
}
208223
}

impl/runner_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,3 +456,14 @@ func TestSwitchTaskRunner_DefaultCase(t *testing.T) {
456456
runWorkflowTest(t, workflowPath, input, expectedOutput)
457457
})
458458
}
459+
460+
func TestForkSimple_NoCompete(t *testing.T) {
461+
t.Run("Create a color array", func(t *testing.T) {
462+
workflowPath := "./testdata/fork_simple.yaml"
463+
input := map[string]interface{}{}
464+
expectedOutput := map[string]interface{}{
465+
"colors": []interface{}{"red", "blue"},
466+
}
467+
runWorkflowTest(t, workflowPath, input, expectedOutput)
468+
})
469+
}

impl/task_runner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,7 @@ type TaskSupport interface {
5353
AddLocalExprVars(vars map[string]interface{})
5454
// RemoveLocalExprVars removes local variables added in AddLocalExprVars or SetLocalExprVars
5555
RemoveLocalExprVars(keys ...string)
56+
// CloneWithContext returns a full clone of this TaskSupport, but using
57+
// the provided context.Context (so deadlines/cancellations propagate).
58+
CloneWithContext(ctx context.Context) TaskSupport
5659
}

impl/task_runner_do.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import (
1818
"fmt"
1919
"time"
2020

21+
"github.com/serverlessworkflow/sdk-go/v3/impl/expr"
22+
"github.com/serverlessworkflow/sdk-go/v3/impl/utils"
23+
2124
"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
2225
"github.com/serverlessworkflow/sdk-go/v3/model"
2326
)
@@ -35,6 +38,8 @@ func NewTaskRunner(taskName string, task model.Task, workflowDef *model.Workflow
3538
return NewForTaskRunner(taskName, t)
3639
case *model.CallHTTP:
3740
return NewCallHttpRunner(taskName, t)
41+
case *model.ForkTask:
42+
return NewForkTaskRunner(taskName, t, workflowDef)
3843
default:
3944
return nil, fmt.Errorf("unsupported task type '%T' for task '%s'", t, taskName)
4045
}
@@ -117,7 +122,7 @@ func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (out
117122
}
118123

119124
taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
120-
input = deepCloneValue(output)
125+
input = utils.DeepCloneValue(output)
121126
idx, currentTask = d.TaskList.Next(idx)
122127
}
123128

@@ -126,7 +131,7 @@ func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (out
126131

127132
func (d *DoTaskRunner) shouldRunTask(input interface{}, taskSupport TaskSupport, task *model.TaskItem) (bool, error) {
128133
if task.GetBase().If != nil {
129-
output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext())
134+
output, err := expr.TraverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext())
130135
if err != nil {
131136
return false, model.NewErrExpression(err, task.Key)
132137
}
@@ -143,7 +148,7 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskSupport TaskSup
143148
defaultThen = switchCase.Then
144149
continue
145150
}
146-
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext())
151+
result, err := expr.TraverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext())
147152
if err != nil {
148153
return nil, model.NewErrExpression(err, taskKey)
149154
}
@@ -199,11 +204,11 @@ func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interfac
199204
return taskInput, nil
200205
}
201206

202-
if err = validateSchema(taskInput, task.Input.Schema, taskName); err != nil {
207+
if err = utils.ValidateSchema(taskInput, task.Input.Schema, taskName); err != nil {
203208
return nil, err
204209
}
205210

206-
if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil {
211+
if output, err = expr.TraverseAndEvaluateObj(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil {
207212
return nil, err
208213
}
209214

@@ -216,11 +221,11 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf
216221
return taskOutput, nil
217222
}
218223

219-
if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil {
224+
if output, err = expr.TraverseAndEvaluateObj(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil {
220225
return nil, err
221226
}
222227

223-
if err = validateSchema(output, task.Output.Schema, taskName); err != nil {
228+
if err = utils.ValidateSchema(output, task.Output.Schema, taskName); err != nil {
224229
return nil, err
225230
}
226231

@@ -232,12 +237,12 @@ func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interf
232237
return nil
233238
}
234239

235-
output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, taskSupport.GetContext())
240+
output, err := expr.TraverseAndEvaluateObj(task.Export.As, taskOutput, taskName, taskSupport.GetContext())
236241
if err != nil {
237242
return err
238243
}
239244

240-
if err = validateSchema(output, task.Export.Schema, taskName); err != nil {
245+
if err = utils.ValidateSchema(output, task.Export.Schema, taskName); err != nil {
241246
return nil
242247
}
243248

impl/task_runner_for.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (f *ForTaskRunner) Run(input interface{}, taskSupport TaskSupport) (interfa
7373
return nil, err
7474
}
7575
if f.Task.While != "" {
76-
whileIsTrue, err := traverseAndEvaluateBool(f.Task.While, forOutput, taskSupport.GetContext())
76+
whileIsTrue, err := expr.TraverseAndEvaluateBool(f.Task.While, forOutput, taskSupport.GetContext())
7777
if err != nil {
7878
return nil, err
7979
}

0 commit comments

Comments
 (0)