Skip to content

Fix #238 - Add support to fork task #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions .github/workflows/pull_request_labeler.yml

This file was deleted.

8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ goimports:
@goimports -w .

lint:
@echo "🚀 Running lint..."
@command -v golangci-lint > /dev/null || (echo "🚀 Installing golangci-lint..."; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b "${GOPATH}/bin")
@echo "🚀 Installing/updating golangci-lint…"
GO111MODULE=on go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest

@echo "🚀 Running lint…"
@make addheaders
@make goimports
@make fmt
@./hack/go-lint.sh ${params}
@$(GOPATH)/bin/golangci-lint run ./... ${params}
@echo "✅ Linting completed!"

.PHONY: test
Expand Down
35 changes: 35 additions & 0 deletions impl/ctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/serverlessworkflow/sdk-go/v3/impl/utils"

"github.com/google/uuid"
"github.com/serverlessworkflow/sdk-go/v3/model"
)
Expand Down Expand Up @@ -71,6 +73,7 @@ type WorkflowContext interface {
SetLocalExprVars(vars map[string]interface{})
AddLocalExprVars(vars map[string]interface{})
RemoveLocalExprVars(keys ...string)
Clone() WorkflowContext
}

// workflowContext holds the necessary data for the workflow execution within the instance.
Expand Down Expand Up @@ -118,6 +121,38 @@ func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) {
return wfCtx, nil
}

func (ctx *workflowContext) Clone() WorkflowContext {
ctx.mu.Lock()
defer ctx.mu.Unlock()

newInput := utils.DeepCloneValue(ctx.input)
newOutput := utils.DeepCloneValue(ctx.output)

// deep clone each of the maps
newContextMap := utils.DeepClone(ctx.context)
newWorkflowDesc := utils.DeepClone(ctx.workflowDescriptor)
newTaskDesc := utils.DeepClone(ctx.taskDescriptor)
newLocalExprVars := utils.DeepClone(ctx.localExprVars)

newStatusPhase := append([]StatusPhaseLog(nil), ctx.StatusPhase...)

newTasksStatusPhase := make(map[string][]StatusPhaseLog, len(ctx.TasksStatusPhase))
for taskName, logs := range ctx.TasksStatusPhase {
newTasksStatusPhase[taskName] = append([]StatusPhaseLog(nil), logs...)
}

return &workflowContext{
input: newInput,
output: newOutput,
context: newContextMap,
workflowDescriptor: newWorkflowDesc,
taskDescriptor: newTaskDesc,
localExprVars: newLocalExprVars,
StatusPhase: newStatusPhase,
TasksStatusPhase: newTasksStatusPhase,
}
}

func (ctx *workflowContext) SetStartedAt(t time.Time) {
ctx.mu.Lock()
defer ctx.mu.Unlock()
Expand Down
25 changes: 25 additions & 0 deletions impl/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,28 @@ func mergeContextInVars(nodeCtx context.Context, variables map[string]interface{

return nil
}

func TraverseAndEvaluateObj(runtimeExpr *model.ObjectOrRuntimeExpr, input interface{}, taskName string, wfCtx context.Context) (output interface{}, err error) {
if runtimeExpr == nil {
return input, nil
}
output, err = TraverseAndEvaluate(runtimeExpr.AsStringOrMap(), input, wfCtx)
if err != nil {
return nil, model.NewErrExpression(err, taskName)
}
return output, nil
}

func TraverseAndEvaluateBool(runtimeExpr string, input interface{}, wfCtx context.Context) (bool, error) {
if len(runtimeExpr) == 0 {
return false, nil
}
output, err := TraverseAndEvaluate(runtimeExpr, input, wfCtx)
if err != nil {
return false, nil
}
if result, ok := output.(bool); ok {
return result, nil
}
return false, nil
}
23 changes: 19 additions & 4 deletions impl/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"fmt"
"time"

"github.com/serverlessworkflow/sdk-go/v3/impl/expr"
"github.com/serverlessworkflow/sdk-go/v3/impl/utils"

"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
"github.com/serverlessworkflow/sdk-go/v3/model"
)
Expand Down Expand Up @@ -53,6 +56,18 @@ type workflowRunnerImpl struct {
RunnerCtx ctx.WorkflowContext
}

func (wr *workflowRunnerImpl) CloneWithContext(newCtx context.Context) TaskSupport {
clonedWfCtx := wr.RunnerCtx.Clone()

ctxWithWf := ctx.WithWorkflowContext(newCtx, clonedWfCtx)

return &workflowRunnerImpl{
Workflow: wr.Workflow,
Context: ctxWithWf,
RunnerCtx: clonedWfCtx,
}
}

func (wr *workflowRunnerImpl) RemoveLocalExprVars(keys ...string) {
wr.RunnerCtx.RemoveLocalExprVars(keys...)
}
Expand Down Expand Up @@ -175,13 +190,13 @@ func (wr *workflowRunnerImpl) wrapWorkflowError(err error) error {
func (wr *workflowRunnerImpl) processInput(input interface{}) (output interface{}, err error) {
if wr.Workflow.Input != nil {
if wr.Workflow.Input.Schema != nil {
if err = validateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil {
if err = utils.ValidateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil {
return nil, err
}
}

if wr.Workflow.Input.From != nil {
output, err = traverseAndEvaluate(wr.Workflow.Input.From, input, "/", wr.Context)
output, err = expr.TraverseAndEvaluateObj(wr.Workflow.Input.From, input, "/", wr.Context)
if err != nil {
return nil, err
}
Expand All @@ -196,13 +211,13 @@ func (wr *workflowRunnerImpl) processOutput(output interface{}) (interface{}, er
if wr.Workflow.Output != nil {
if wr.Workflow.Output.As != nil {
var err error
output, err = traverseAndEvaluate(wr.Workflow.Output.As, output, "/", wr.Context)
output, err = expr.TraverseAndEvaluateObj(wr.Workflow.Output.As, output, "/", wr.Context)
if err != nil {
return nil, err
}
}
if wr.Workflow.Output.Schema != nil {
if err := validateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil {
if err := utils.ValidateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil {
return nil, err
}
}
Expand Down
11 changes: 11 additions & 0 deletions impl/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,14 @@ func TestSwitchTaskRunner_DefaultCase(t *testing.T) {
runWorkflowTest(t, workflowPath, input, expectedOutput)
})
}

func TestForkSimple_NoCompete(t *testing.T) {
t.Run("Create a color array", func(t *testing.T) {
workflowPath := "./testdata/fork_simple.yaml"
input := map[string]interface{}{}
expectedOutput := map[string]interface{}{
"colors": []interface{}{"red", "blue"},
}
runWorkflowTest(t, workflowPath, input, expectedOutput)
})
}
3 changes: 3 additions & 0 deletions impl/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ type TaskSupport interface {
AddLocalExprVars(vars map[string]interface{})
// RemoveLocalExprVars removes local variables added in AddLocalExprVars or SetLocalExprVars
RemoveLocalExprVars(keys ...string)
// CloneWithContext returns a full clone of this TaskSupport, but using
// the provided context.Context (so deadlines/cancellations propagate).
CloneWithContext(ctx context.Context) TaskSupport
}
23 changes: 14 additions & 9 deletions impl/task_runner_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"fmt"
"time"

"github.com/serverlessworkflow/sdk-go/v3/impl/expr"
"github.com/serverlessworkflow/sdk-go/v3/impl/utils"

"github.com/serverlessworkflow/sdk-go/v3/impl/ctx"
"github.com/serverlessworkflow/sdk-go/v3/model"
)
Expand All @@ -35,6 +38,8 @@ func NewTaskRunner(taskName string, task model.Task, workflowDef *model.Workflow
return NewForTaskRunner(taskName, t)
case *model.CallHTTP:
return NewCallHttpRunner(taskName, t)
case *model.ForkTask:
return NewForkTaskRunner(taskName, t, workflowDef)
default:
return nil, fmt.Errorf("unsupported task type '%T' for task '%s'", t, taskName)
}
Expand Down Expand Up @@ -117,7 +122,7 @@ func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (out
}

taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus)
input = deepCloneValue(output)
input = utils.DeepCloneValue(output)
idx, currentTask = d.TaskList.Next(idx)
}

Expand All @@ -126,7 +131,7 @@ func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (out

func (d *DoTaskRunner) shouldRunTask(input interface{}, taskSupport TaskSupport, task *model.TaskItem) (bool, error) {
if task.GetBase().If != nil {
output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext())
output, err := expr.TraverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext())
if err != nil {
return false, model.NewErrExpression(err, task.Key)
}
Expand All @@ -143,7 +148,7 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskSupport TaskSup
defaultThen = switchCase.Then
continue
}
result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext())
result, err := expr.TraverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext())
if err != nil {
return nil, model.NewErrExpression(err, taskKey)
}
Expand Down Expand Up @@ -199,11 +204,11 @@ func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interfac
return taskInput, nil
}

if err = validateSchema(taskInput, task.Input.Schema, taskName); err != nil {
if err = utils.ValidateSchema(taskInput, task.Input.Schema, taskName); err != nil {
return nil, err
}

if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil {
if output, err = expr.TraverseAndEvaluateObj(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil {
return nil, err
}

Expand All @@ -216,11 +221,11 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf
return taskOutput, nil
}

if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil {
if output, err = expr.TraverseAndEvaluateObj(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil {
return nil, err
}

if err = validateSchema(output, task.Output.Schema, taskName); err != nil {
if err = utils.ValidateSchema(output, task.Output.Schema, taskName); err != nil {
return nil, err
}

Expand All @@ -232,12 +237,12 @@ func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interf
return nil
}

output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, taskSupport.GetContext())
output, err := expr.TraverseAndEvaluateObj(task.Export.As, taskOutput, taskName, taskSupport.GetContext())
if err != nil {
return err
}

if err = validateSchema(output, task.Export.Schema, taskName); err != nil {
if err = utils.ValidateSchema(output, task.Export.Schema, taskName); err != nil {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion impl/task_runner_for.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (f *ForTaskRunner) Run(input interface{}, taskSupport TaskSupport) (interfa
return nil, err
}
if f.Task.While != "" {
whileIsTrue, err := traverseAndEvaluateBool(f.Task.While, forOutput, taskSupport.GetContext())
whileIsTrue, err := expr.TraverseAndEvaluateBool(f.Task.While, forOutput, taskSupport.GetContext())
if err != nil {
return nil, err
}
Expand Down
Loading
Loading