From cf3064ec5a93913b29e2d2221718e85419561cc0 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 25 Mar 2025 19:16:11 -0400 Subject: [PATCH 1/4] Refactor expr packaged and shared context Signed-off-by: Ricardo Zanini --- expr/expr.go | 112 ------------- impl/{ => ctx}/context.go | 99 ++++++++---- impl/{ => ctx}/status_phase.go | 2 +- impl/expr/expr.go | 136 ++++++++++++++++ impl/expr/expr_test.go | 263 +++++++++++++++++++++++++++++++ impl/runner.go | 77 ++++++--- impl/task_runner.go | 53 ++++--- impl/task_runner_do.go | 50 ++++-- impl/task_runner_raise_test.go | 6 +- impl/task_runner_test.go | 50 +++++- impl/task_set_test.go | 26 +-- impl/utils.go | 25 +-- model/objects.go | 6 + model/runtime_expression.go | 31 +++- model/runtime_expression_test.go | 149 +++++++++++++++++ model/workflow.go | 14 ++ 16 files changed, 855 insertions(+), 244 deletions(-) delete mode 100644 expr/expr.go rename impl/{ => ctx}/context.go (51%) rename impl/{ => ctx}/status_phase.go (99%) create mode 100644 impl/expr/expr.go create mode 100644 impl/expr/expr_test.go diff --git a/expr/expr.go b/expr/expr.go deleted file mode 100644 index cd5a755..0000000 --- a/expr/expr.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2025 The Serverless Workflow Specification Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package expr - -import ( - "errors" - "fmt" - "strings" - - "github.com/itchyny/gojq" -) - -// IsStrictExpr returns true if the string is enclosed in `${ }` -func IsStrictExpr(expression string) bool { - return strings.HasPrefix(expression, "${") && strings.HasSuffix(expression, "}") -} - -// Sanitize processes the expression to ensure it's ready for evaluation -// It removes `${}` if present and replaces single quotes with double quotes -func Sanitize(expression string) string { - // Remove `${}` enclosure if present - if IsStrictExpr(expression) { - expression = strings.TrimSpace(expression[2 : len(expression)-1]) - } - - // Replace single quotes with double quotes - expression = strings.ReplaceAll(expression, "'", "\"") - - return expression -} - -// IsValid tries to parse and check if the given value is a valid expression -func IsValid(expression string) bool { - expression = Sanitize(expression) - _, err := gojq.Parse(expression) - return err == nil -} - -// TraverseAndEvaluate recursively processes and evaluates all expressions in a JSON-like structure -func TraverseAndEvaluate(node interface{}, input interface{}) (interface{}, error) { - switch v := node.(type) { - case map[string]interface{}: - // Traverse map - for key, value := range v { - evaluatedValue, err := TraverseAndEvaluate(value, input) - if err != nil { - return nil, err - } - v[key] = evaluatedValue - } - return v, nil - - case []interface{}: - // Traverse array - for i, value := range v { - evaluatedValue, err := TraverseAndEvaluate(value, input) - if err != nil { - return nil, err - } - v[i] = evaluatedValue - } - return v, nil - - case string: - // Check if the string is a runtime expression (e.g., ${ .some.path }) - if IsStrictExpr(v) { - return evaluateJQExpression(Sanitize(v), input) - } - return v, nil - - default: - // Return other types as-is - return v, nil - } -} - -// TODO: add support to variables see https://github.com/itchyny/gojq/blob/main/option_variables_test.go - -// evaluateJQExpression evaluates a jq expression against a given JSON input -func evaluateJQExpression(expression string, input interface{}) (interface{}, error) { - // Parse the sanitized jq expression - query, err := gojq.Parse(expression) - if err != nil { - return nil, fmt.Errorf("failed to parse jq expression: %s, error: %w", expression, err) - } - - // Compile and evaluate the expression - iter := query.Run(input) - result, ok := iter.Next() - if !ok { - return nil, errors.New("no result from jq evaluation") - } - - // Check if an error occurred during evaluation - if err, isErr := result.(error); isErr { - return nil, fmt.Errorf("jq evaluation error: %w", err) - } - - return result, nil -} diff --git a/impl/context.go b/impl/ctx/context.go similarity index 51% rename from impl/context.go rename to impl/ctx/context.go index ae9375e..d5aa920 100644 --- a/impl/context.go +++ b/impl/ctx/context.go @@ -12,33 +12,76 @@ // See the License for the specific language governing permissions and // limitations under the License. -package impl +package ctx import ( "context" "errors" + "github.com/serverlessworkflow/sdk-go/v3/model" "sync" ) +var ErrWorkflowContextNotFound = errors.New("workflow context not found") + +var _ WorkflowContext = &workflowContext{} + type ctxKey string -const runnerCtxKey ctxKey = "wfRunnerContext" +const ( + runnerCtxKey ctxKey = "wfRunnerContext" + varsContext = "$context" + varsInput = "$input" + varsOutput = "$output" + varsWorkflow = "$workflow" +) + +type WorkflowContext interface { + SetStatus(status StatusPhase) + SetTaskStatus(task string, status StatusPhase) + SetInstanceCtx(value interface{}) + GetInstanceCtx() interface{} + SetInput(input interface{}) + GetInput() interface{} + SetOutput(output interface{}) + GetOutput() interface{} + GetOutputAsMap() map[string]interface{} + AsJQVars() map[string]interface{} +} -// WorkflowContext holds the necessary data for the workflow execution within the instance. -type WorkflowContext struct { +// workflowContext holds the necessary data for the workflow execution within the instance. +type workflowContext struct { mu sync.Mutex - input interface{} // input can hold any type - output interface{} // output can hold any type - context map[string]interface{} + input interface{} // $input can hold any type + output interface{} // $output can hold any type + context map[string]interface{} // Holds `$context` as the key + definition map[string]interface{} // $workflow representation in the context StatusPhase []StatusPhaseLog - TasksStatusPhase map[string][]StatusPhaseLog // Holds `$context` as the key + TasksStatusPhase map[string][]StatusPhaseLog } -type TaskContext interface { - SetTaskStatus(task string, status StatusPhase) +func NewWorkflowContext(workflow *model.Workflow) (WorkflowContext, error) { + workflowCtx := &workflowContext{} + workflowDef, err := workflow.AsMap() + if err != nil { + return nil, err + } + + workflowCtx.definition = workflowDef + workflowCtx.SetStatus(PendingStatus) + + return workflowCtx, nil +} + +func (ctx *workflowContext) AsJQVars() map[string]interface{} { + vars := make(map[string]interface{}) + vars[varsInput] = ctx.GetInput() + vars[varsOutput] = ctx.GetOutput() + vars[varsContext] = ctx.GetInstanceCtx() + vars[varsOutput] = ctx.definition + return vars } -func (ctx *WorkflowContext) SetStatus(status StatusPhase) { +func (ctx *workflowContext) SetStatus(status StatusPhase) { ctx.mu.Lock() defer ctx.mu.Unlock() if ctx.StatusPhase == nil { @@ -47,7 +90,7 @@ func (ctx *WorkflowContext) SetStatus(status StatusPhase) { ctx.StatusPhase = append(ctx.StatusPhase, NewStatusPhaseLog(status)) } -func (ctx *WorkflowContext) SetTaskStatus(task string, status StatusPhase) { +func (ctx *workflowContext) SetTaskStatus(task string, status StatusPhase) { ctx.mu.Lock() defer ctx.mu.Unlock() if ctx.TasksStatusPhase == nil { @@ -57,48 +100,48 @@ func (ctx *WorkflowContext) SetTaskStatus(task string, status StatusPhase) { } // SetInstanceCtx safely sets the `$context` value -func (ctx *WorkflowContext) SetInstanceCtx(value interface{}) { +func (ctx *workflowContext) SetInstanceCtx(value interface{}) { ctx.mu.Lock() defer ctx.mu.Unlock() if ctx.context == nil { ctx.context = make(map[string]interface{}) } - ctx.context["$context"] = value + ctx.context[varsContext] = value } // GetInstanceCtx safely retrieves the `$context` value -func (ctx *WorkflowContext) GetInstanceCtx() interface{} { +func (ctx *workflowContext) GetInstanceCtx() interface{} { ctx.mu.Lock() defer ctx.mu.Unlock() if ctx.context == nil { return nil } - return ctx.context["$context"] + return ctx.context[varsContext] } // SetInput safely sets the input -func (ctx *WorkflowContext) SetInput(input interface{}) { +func (ctx *workflowContext) SetInput(input interface{}) { ctx.mu.Lock() defer ctx.mu.Unlock() ctx.input = input } // GetInput safely retrieves the input -func (ctx *WorkflowContext) GetInput() interface{} { +func (ctx *workflowContext) GetInput() interface{} { ctx.mu.Lock() defer ctx.mu.Unlock() return ctx.input } // SetOutput safely sets the output -func (ctx *WorkflowContext) SetOutput(output interface{}) { +func (ctx *workflowContext) SetOutput(output interface{}) { ctx.mu.Lock() defer ctx.mu.Unlock() ctx.output = output } // GetOutput safely retrieves the output -func (ctx *WorkflowContext) GetOutput() interface{} { +func (ctx *workflowContext) GetOutput() interface{} { ctx.mu.Lock() defer ctx.mu.Unlock() return ctx.output @@ -106,7 +149,7 @@ func (ctx *WorkflowContext) GetOutput() interface{} { // GetInputAsMap safely retrieves the input as a map[string]interface{}. // If input is not a map, it creates a map with an empty string key and the input as the value. -func (ctx *WorkflowContext) GetInputAsMap() map[string]interface{} { +func (ctx *workflowContext) GetInputAsMap() map[string]interface{} { ctx.mu.Lock() defer ctx.mu.Unlock() @@ -122,7 +165,7 @@ func (ctx *WorkflowContext) GetInputAsMap() map[string]interface{} { // GetOutputAsMap safely retrieves the output as a map[string]interface{}. // If output is not a map, it creates a map with an empty string key and the output as the value. -func (ctx *WorkflowContext) GetOutputAsMap() map[string]interface{} { +func (ctx *workflowContext) GetOutputAsMap() map[string]interface{} { ctx.mu.Lock() defer ctx.mu.Unlock() @@ -136,16 +179,16 @@ func (ctx *WorkflowContext) GetOutputAsMap() map[string]interface{} { } } -// WithWorkflowContext adds the WorkflowContext to a parent context -func WithWorkflowContext(parent context.Context, wfCtx *WorkflowContext) context.Context { +// WithWorkflowContext adds the workflowContext to a parent context +func WithWorkflowContext(parent context.Context, wfCtx WorkflowContext) context.Context { return context.WithValue(parent, runnerCtxKey, wfCtx) } -// GetWorkflowContext retrieves the WorkflowContext from a context -func GetWorkflowContext(ctx context.Context) (*WorkflowContext, error) { - wfCtx, ok := ctx.Value(runnerCtxKey).(*WorkflowContext) +// GetWorkflowContext retrieves the workflowContext from a context +func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) { + wfCtx, ok := ctx.Value(runnerCtxKey).(*workflowContext) if !ok { - return nil, errors.New("workflow context not found") + return nil, ErrWorkflowContextNotFound } return wfCtx, nil } diff --git a/impl/status_phase.go b/impl/ctx/status_phase.go similarity index 99% rename from impl/status_phase.go rename to impl/ctx/status_phase.go index ca61fad..ddcab9c 100644 --- a/impl/status_phase.go +++ b/impl/ctx/status_phase.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package impl +package ctx import "time" diff --git a/impl/expr/expr.go b/impl/expr/expr.go new file mode 100644 index 0000000..4d48589 --- /dev/null +++ b/impl/expr/expr.go @@ -0,0 +1,136 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package expr + +import ( + "context" + "errors" + "fmt" + "github.com/itchyny/gojq" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" + "github.com/serverlessworkflow/sdk-go/v3/model" +) + +func TraverseAndEvaluateWithVars(node interface{}, input interface{}, variables map[string]interface{}, nodeContext context.Context) (interface{}, error) { + if err := mergeContextInVars(nodeContext, variables); err != nil { + return nil, err + } + return traverseAndEvaluate(node, input, variables) +} + +// TraverseAndEvaluate recursively processes and evaluates all expressions in a JSON-like structure +func TraverseAndEvaluate(node interface{}, input interface{}, nodeContext context.Context) (interface{}, error) { + return TraverseAndEvaluateWithVars(node, input, map[string]interface{}{}, nodeContext) +} + +func traverseAndEvaluate(node interface{}, input interface{}, variables map[string]interface{}) (interface{}, error) { + switch v := node.(type) { + case map[string]interface{}: + // Traverse map + for key, value := range v { + evaluatedValue, err := traverseAndEvaluate(value, input, variables) + if err != nil { + return nil, err + } + v[key] = evaluatedValue + } + return v, nil + + case []interface{}: + // Traverse array + for i, value := range v { + evaluatedValue, err := traverseAndEvaluate(value, input, variables) + if err != nil { + return nil, err + } + v[i] = evaluatedValue + } + return v, nil + + case string: + // Check if the string is a runtime expression (e.g., ${ .some.path }) + if model.IsStrictExpr(v) { + return evaluateJQExpression(model.SanitizeExpr(v), input, variables) + } + return v, nil + + default: + // Return other types as-is + return v, nil + } +} + +// evaluateJQExpression evaluates a jq expression against a given JSON input +func evaluateJQExpression(expression string, input interface{}, variables map[string]interface{}) (interface{}, error) { + // Parse the sanitized jq expression + query, err := gojq.Parse(expression) + if err != nil { + return nil, fmt.Errorf("failed to parse jq expression: %s, error: %w", expression, err) + } + + code, err := gojq.Compile(query, gojq.WithVariables(getVariablesName(variables))) + if err != nil { + return nil, fmt.Errorf("failed to compile jq expression: %s, error: %w", expression, err) + } + + // Compile and evaluate the expression + iter := code.Run(input, getVariablesValue(variables)...) + result, ok := iter.Next() + if !ok { + return nil, errors.New("no result from jq evaluation") + } + + // Check if an error occurred during evaluation + if err, isErr := result.(error); isErr { + return nil, fmt.Errorf("jq evaluation error: %w", err) + } + + return result, nil +} + +func getVariablesName(variables map[string]interface{}) []string { + result := make([]string, 0, len(variables)) + for variable := range variables { + result = append(result, variable) + } + return result +} + +func getVariablesValue(variables map[string]interface{}) []interface{} { + result := make([]interface{}, 0, len(variables)) + for _, variable := range variables { + result = append(result, variable) + } + return result +} + +func mergeContextInVars(nodeCtx context.Context, variables map[string]interface{}) error { + if variables == nil { + variables = make(map[string]interface{}) + } + wfCtx, err := ctx.GetWorkflowContext(nodeCtx) + if err != nil { + if errors.Is(err, ctx.ErrWorkflowContextNotFound) { + return nil + } + return err + } + // merge + for k, val := range wfCtx.AsJQVars() { + variables[k] = val + } + + return nil +} diff --git a/impl/expr/expr_test.go b/impl/expr/expr_test.go new file mode 100644 index 0000000..f2af54a --- /dev/null +++ b/impl/expr/expr_test.go @@ -0,0 +1,263 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package expr + +import ( + "context" + "fmt" + "testing" + + "github.com/itchyny/gojq" +) + +func TestTraverseAndEvaluate(t *testing.T) { + t.Run("Simple no-expression map", func(t *testing.T) { + node := map[string]interface{}{ + "key": "value", + "num": 123, + } + result, err := TraverseAndEvaluate(node, nil, context.TODO()) + if err != nil { + t.Fatalf("TraverseAndEvaluate() unexpected error: %v", err) + } + + got, ok := result.(map[string]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluate() did not return a map") + } + if got["key"] != "value" || got["num"] != 123 { + t.Errorf("TraverseAndEvaluate() returned unexpected map data: %#v", got) + } + }) + + t.Run("Expression in map", func(t *testing.T) { + node := map[string]interface{}{ + "expr": "${ .foo }", + } + input := map[string]interface{}{ + "foo": "bar", + } + + result, err := TraverseAndEvaluate(node, input, context.TODO()) + if err != nil { + t.Fatalf("TraverseAndEvaluate() unexpected error: %v", err) + } + + got, ok := result.(map[string]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluate() did not return a map") + } + if got["expr"] != "bar" { + t.Errorf("TraverseAndEvaluate() = %v, want %v", got["expr"], "bar") + } + }) + + t.Run("Expression in array", func(t *testing.T) { + node := []interface{}{ + "static", + "${ .foo }", + } + input := map[string]interface{}{ + "foo": "bar", + } + + result, err := TraverseAndEvaluate(node, input, context.TODO()) + if err != nil { + t.Fatalf("TraverseAndEvaluate() unexpected error: %v", err) + } + + got, ok := result.([]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluate() did not return an array") + } + if got[0] != "static" { + t.Errorf("TraverseAndEvaluate()[0] = %v, want 'static'", got[0]) + } + if got[1] != "bar" { + t.Errorf("TraverseAndEvaluate()[1] = %v, want 'bar'", got[1]) + } + }) + + t.Run("Nested structures", func(t *testing.T) { + node := map[string]interface{}{ + "level1": []interface{}{ + map[string]interface{}{ + "expr": "${ .foo }", + }, + }, + } + input := map[string]interface{}{ + "foo": "nestedValue", + } + + result, err := TraverseAndEvaluate(node, input, context.TODO()) + if err != nil { + t.Fatalf("TraverseAndEvaluate() error: %v", err) + } + + resMap, ok := result.(map[string]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluate() did not return a map at top-level") + } + + level1, ok := resMap["level1"].([]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluate() did not return an array for resMap['level1']") + } + + level1Map, ok := level1[0].(map[string]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluate() did not return a map for level1[0]") + } + + if level1Map["expr"] != "nestedValue" { + t.Errorf("TraverseAndEvaluate() = %v, want %v", level1Map["expr"], "nestedValue") + } + }) + + t.Run("Invalid JQ expression", func(t *testing.T) { + node := "${ .foo( }" + input := map[string]interface{}{ + "foo": "bar", + } + + _, err := TraverseAndEvaluate(node, input, context.TODO()) + if err == nil { + t.Errorf("TraverseAndEvaluate() expected error for invalid JQ, got nil") + } + }) +} + +func TestTraverseAndEvaluateWithVars(t *testing.T) { + t.Run("Variable usage in expression", func(t *testing.T) { + node := map[string]interface{}{ + "expr": "${ $myVar }", + } + variables := map[string]interface{}{ + "$myVar": "HelloVars", + } + input := map[string]interface{}{} + + result, err := TraverseAndEvaluateWithVars(node, input, variables, context.TODO()) + if err != nil { + t.Fatalf("TraverseAndEvaluateWithVars() unexpected error: %v", err) + } + got, ok := result.(map[string]interface{}) + if !ok { + t.Fatalf("TraverseAndEvaluateWithVars() did not return a map") + } + if got["expr"] != "HelloVars" { + t.Errorf("TraverseAndEvaluateWithVars() = %v, want %v", got["expr"], "HelloVars") + } + }) + + t.Run("Reference variable that isn't defined", func(t *testing.T) { + // This tries to use a variable that isn't passed in, + // so presumably it yields an error about an undefined variable. + node := "${ $notProvided }" + input := map[string]interface{}{ + "foo": "bar", + } + variables := map[string]interface{}{} // intentionally empty + + _, err := TraverseAndEvaluateWithVars(node, input, variables, context.TODO()) + if err == nil { + t.Errorf("TraverseAndEvaluateWithVars() expected error for undefined variable, got nil") + } else { + t.Logf("Got expected error: %v", err) + } + }) +} + +func TestEvaluateJQExpressionDirect(t *testing.T) { + // This tests the core evaluator directly for errors and success. + t.Run("Successful eval", func(t *testing.T) { + expression := ".foo" + input := map[string]interface{}{"foo": "bar"} + variables := map[string]interface{}{} + result, err := callEvaluateJQ(expression, input, variables) + if err != nil { + t.Fatalf("evaluateJQExpression() error = %v, want nil", err) + } + if result != "bar" { + t.Errorf("evaluateJQExpression() = %v, want 'bar'", result) + } + }) + + t.Run("Parse error", func(t *testing.T) { + expression := ".foo(" + input := map[string]interface{}{"foo": "bar"} + variables := map[string]interface{}{} + _, err := callEvaluateJQ(expression, input, variables) + if err == nil { + t.Errorf("evaluateJQExpression() expected parse error, got nil") + } + }) + + t.Run("Runtime error in evaluation (undefined variable)", func(t *testing.T) { + expression := "$undefinedVar" + input := map[string]interface{}{ + "foo": []interface{}{1, 2}, + } + variables := map[string]interface{}{} + _, err := callEvaluateJQ(expression, input, variables) + if err == nil { + t.Errorf("callEvaluateJQ() expected runtime error, got nil") + } else { + t.Logf("Got expected error: %v", err) + } + }) +} + +// Helper to call the unexported evaluateJQExpression via a wrapper in tests. +// Alternatively, you could move `evaluateJQExpression` into a separate file that +// is also in package `expr`, then test it directly if needed. +func callEvaluateJQ(expression string, input interface{}, variables map[string]interface{}) (interface{}, error) { + // Replicate the logic from evaluateJQExpression for direct testing + query, err := gojq.Parse(expression) + if err != nil { + return nil, fmt.Errorf("failed to parse: %w", err) + } + code, err := gojq.Compile(query, gojq.WithVariables(exprGetVariableNames(variables))) + if err != nil { + return nil, fmt.Errorf("failed to compile: %w", err) + } + iter := code.Run(input, exprGetVariableValues(variables)...) + result, ok := iter.Next() + if !ok { + return nil, fmt.Errorf("no result from jq evaluation") + } + if e, isErr := result.(error); isErr { + return nil, fmt.Errorf("runtime error: %w", e) + } + return result, nil +} + +// Local copies of the variable-gathering logic from your code: +func exprGetVariableNames(variables map[string]interface{}) []string { + names := make([]string, 0, len(variables)) + for name := range variables { + names = append(names, name) + } + return names +} + +func exprGetVariableValues(variables map[string]interface{}) []interface{} { + vals := make([]interface{}, 0, len(variables)) + for _, val := range variables { + vals = append(vals, val) + } + return vals +} diff --git a/impl/runner.go b/impl/runner.go index c219886..f0cfd50 100644 --- a/impl/runner.go +++ b/impl/runner.go @@ -17,53 +17,71 @@ package impl import ( "context" "fmt" - + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" ) var _ WorkflowRunner = &workflowRunnerImpl{} +var _ TaskSupport = &workflowRunnerImpl{} + +type TaskSupport interface { + SetTaskStatus(task string, status ctx.StatusPhase) + GetWorkflowDef() *model.Workflow + SetWorkflowInstanceCtx(value interface{}) + GetContext() context.Context +} type WorkflowRunner interface { GetWorkflowDef() *model.Workflow Run(input interface{}) (output interface{}, err error) - GetContext() *WorkflowContext + GetWorkflowCtx() ctx.WorkflowContext } -func NewDefaultRunner(workflow *model.Workflow) WorkflowRunner { - wfContext := &WorkflowContext{} - wfContext.SetStatus(PendingStatus) +func NewDefaultRunner(workflow *model.Workflow) (WorkflowRunner, error) { + wfContext, err := ctx.NewWorkflowContext(workflow) + if err != nil { + return nil, err + } // TODO: based on the workflow definition, the context might change. - ctx := WithWorkflowContext(context.Background(), wfContext) + objCtx := ctx.WithWorkflowContext(context.Background(), wfContext) return &workflowRunnerImpl{ Workflow: workflow, - Context: ctx, + Context: objCtx, RunnerCtx: wfContext, - } + }, nil } type workflowRunnerImpl struct { Workflow *model.Workflow Context context.Context - RunnerCtx *WorkflowContext + RunnerCtx ctx.WorkflowContext } -func (wr *workflowRunnerImpl) GetContext() *WorkflowContext { - return wr.RunnerCtx +func (wr *workflowRunnerImpl) GetContext() context.Context { + return wr.Context } -func (wr *workflowRunnerImpl) GetTaskContext() TaskContext { +func (wr *workflowRunnerImpl) GetWorkflowCtx() ctx.WorkflowContext { return wr.RunnerCtx } +func (wr *workflowRunnerImpl) SetTaskStatus(task string, status ctx.StatusPhase) { + wr.RunnerCtx.SetTaskStatus(task, status) +} + func (wr *workflowRunnerImpl) GetWorkflowDef() *model.Workflow { return wr.Workflow } +func (wr *workflowRunnerImpl) SetWorkflowInstanceCtx(value interface{}) { + wr.RunnerCtx.SetInstanceCtx(value) +} + // Run executes the workflow synchronously. func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err error) { defer func() { if err != nil { - wr.RunnerCtx.SetStatus(FaultedStatus) + wr.RunnerCtx.SetStatus(ctx.FaultedStatus) err = wr.wrapWorkflowError(err, "/") } }() @@ -75,7 +93,7 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er wr.RunnerCtx.SetInput(input) // Run tasks sequentially - wr.RunnerCtx.SetStatus(RunningStatus) + wr.RunnerCtx.SetStatus(ctx.RunningStatus) doRunner, err := NewDoTaskRunner(wr.Workflow.Do, wr) if err != nil { return nil, err @@ -91,7 +109,7 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er } wr.RunnerCtx.SetOutput(output) - wr.RunnerCtx.SetStatus(CompletedStatus) + wr.RunnerCtx.SetStatus(ctx.CompletedStatus) return output, nil } @@ -106,11 +124,19 @@ func (wr *workflowRunnerImpl) wrapWorkflowError(err error, taskName string) erro // processInput validates and transforms input if needed. func (wr *workflowRunnerImpl) processInput(input interface{}) (output interface{}, err error) { if wr.Workflow.Input != nil { - output, err = processIO(input, wr.Workflow.Input.Schema, wr.Workflow.Input.From, "/") - if err != nil { - return nil, err + if wr.Workflow.Input.Schema != nil { + if err = validateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil { + return nil, err + } + } + + if wr.Workflow.Input.From != nil { + output, err = traverseAndEvaluate(wr.Workflow.Input.From, input, "/", wr.Context) + if err != nil { + return nil, err + } + return output, nil } - return output, nil } return input, nil } @@ -118,7 +144,18 @@ func (wr *workflowRunnerImpl) processInput(input interface{}) (output interface{ // processOutput applies output transformations. func (wr *workflowRunnerImpl) processOutput(output interface{}) (interface{}, error) { if wr.Workflow.Output != nil { - return processIO(output, wr.Workflow.Output.Schema, wr.Workflow.Output.As, "/") + if wr.Workflow.Output.As != nil { + var err error + output, err = traverseAndEvaluate(wr.Workflow.Output.As, output, "/", wr.Context) + if err != nil { + return nil, err + } + } + if wr.Workflow.Output.Schema != nil { + if err := validateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil { + return nil, err + } + } } return output, nil } diff --git a/impl/task_runner.go b/impl/task_runner.go index 05d3817..a64c329 100644 --- a/impl/task_runner.go +++ b/impl/task_runner.go @@ -19,7 +19,7 @@ import ( "reflect" "strings" - "github.com/serverlessworkflow/sdk-go/v3/expr" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -32,19 +32,21 @@ type TaskRunner interface { GetTaskName() string } -func NewSetTaskRunner(taskName string, task *model.SetTask) (*SetTaskRunner, error) { +func NewSetTaskRunner(taskName string, task *model.SetTask, taskSupport TaskSupport) (*SetTaskRunner, error) { if task == nil || task.Set == nil { return nil, model.NewErrValidation(fmt.Errorf("no set configuration provided for SetTask %s", taskName), taskName) } return &SetTaskRunner{ - Task: task, - TaskName: taskName, + Task: task, + TaskName: taskName, + TaskSupport: taskSupport, }, nil } type SetTaskRunner struct { - Task *model.SetTask - TaskName string + Task *model.SetTask + TaskName string + TaskSupport TaskSupport } func (s *SetTaskRunner) GetTaskName() string { @@ -53,9 +55,9 @@ func (s *SetTaskRunner) GetTaskName() string { func (s *SetTaskRunner) Run(input interface{}) (output interface{}, err error) { setObject := deepClone(s.Task.Set) - result, err := expr.TraverseAndEvaluate(setObject, input) + result, err := traverseAndEvaluate(model.NewObjectOrRuntimeExpr(setObject), input, s.TaskName, s.TaskSupport.GetContext()) if err != nil { - return nil, model.NewErrExpression(err, s.TaskName) + return nil, err } output, ok := result.(map[string]interface{}) @@ -66,16 +68,18 @@ func (s *SetTaskRunner) Run(input interface{}) (output interface{}, err error) { return output, nil } -func NewRaiseTaskRunner(taskName string, task *model.RaiseTask, workflowDef *model.Workflow) (*RaiseTaskRunner, error) { - if err := resolveErrorDefinition(task, workflowDef); err != nil { +func NewRaiseTaskRunner(taskName string, task *model.RaiseTask, taskSupport TaskSupport) (*RaiseTaskRunner, error) { + if err := resolveErrorDefinition(task, taskSupport.GetWorkflowDef()); err != nil { return nil, err } + if task.Raise.Error.Definition == nil { return nil, model.NewErrValidation(fmt.Errorf("no raise configuration provided for RaiseTask %s", taskName), taskName) } return &RaiseTaskRunner{ - Task: task, - TaskName: taskName, + Task: task, + TaskName: taskName, + TaskSupport: taskSupport, }, nil } @@ -97,8 +101,9 @@ func resolveErrorDefinition(t *model.RaiseTask, workflowDef *model.Workflow) err } type RaiseTaskRunner struct { - Task *model.RaiseTask - TaskName string + Task *model.RaiseTask + TaskName string + TaskSupport TaskSupport } var raiseErrFuncMapping = map[string]func(error, string) *model.Error{ @@ -116,13 +121,13 @@ func (r *RaiseTaskRunner) Run(input interface{}) (output interface{}, err error) output = input // TODO: make this an external func so we can call it after getting the reference? Or we can get the reference from the workflow definition var detailResult interface{} - detailResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName) + detailResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName, r.TaskSupport.GetContext()) if err != nil { return nil, err } var titleResult interface{} - titleResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName) + titleResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName, r.TaskSupport.GetContext()) if err != nil { return nil, err } @@ -159,9 +164,10 @@ func NewForTaskRunner(taskName string, task *model.ForTask, taskSupport TaskSupp } return &ForTaskRunner{ - Task: task, - TaskName: taskName, - DoRunner: doRunner, + Task: task, + TaskName: taskName, + DoRunner: doRunner, + TaskSupport: taskSupport, }, nil } @@ -171,14 +177,15 @@ const ( ) type ForTaskRunner struct { - Task *model.ForTask - TaskName string - DoRunner *DoTaskRunner + Task *model.ForTask + TaskName string + DoRunner *DoTaskRunner + TaskSupport TaskSupport } func (f *ForTaskRunner) Run(input interface{}) (interface{}, error) { f.sanitizeFor() - in, err := expr.TraverseAndEvaluate(f.Task.For.In, input) + in, err := expr.TraverseAndEvaluate(f.Task.For.In, input, f.TaskSupport.GetContext()) if err != nil { return nil, err } diff --git a/impl/task_runner_do.go b/impl/task_runner_do.go index a34a4dd..07c0e29 100644 --- a/impl/task_runner_do.go +++ b/impl/task_runner_do.go @@ -16,27 +16,23 @@ package impl import ( "fmt" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" - "github.com/serverlessworkflow/sdk-go/v3/expr" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" "github.com/serverlessworkflow/sdk-go/v3/model" ) var _ TaskRunner = &DoTaskRunner{} -type TaskSupport interface { - GetTaskContext() TaskContext - GetWorkflowDef() *model.Workflow -} - // TODO: refactor to receive a resolver handler instead of the workflow runner // NewTaskRunner creates a TaskRunner instance based on the task type. func NewTaskRunner(taskName string, task model.Task, taskSupport TaskSupport) (TaskRunner, error) { switch t := task.(type) { case *model.SetTask: - return NewSetTaskRunner(taskName, t) + return NewSetTaskRunner(taskName, t, taskSupport) case *model.RaiseTask: - return NewRaiseTaskRunner(taskName, t, taskSupport.GetWorkflowDef()) + return NewRaiseTaskRunner(taskName, t, taskSupport) case *model.DoTask: return NewDoTaskRunner(t.Do, taskSupport) case *model.ForTask: @@ -78,7 +74,6 @@ func (d *DoTaskRunner) executeTasks(input interface{}, tasks *model.TaskList) (o idx := 0 currentTask := (*tasks)[idx] - ctx := d.TaskSupport.GetTaskContext() for currentTask != nil { if shouldRun, err := d.shouldRunTask(input, currentTask); err != nil { @@ -88,19 +83,19 @@ func (d *DoTaskRunner) executeTasks(input interface{}, tasks *model.TaskList) (o continue } - ctx.SetTaskStatus(currentTask.Key, PendingStatus) + d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus) runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, d.TaskSupport) if err != nil { return output, err } - ctx.SetTaskStatus(currentTask.Key, RunningStatus) + d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.RunningStatus) if output, err = d.runTask(input, runner, currentTask.Task.GetBase()); err != nil { - ctx.SetTaskStatus(currentTask.Key, FaultedStatus) + d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus) return output, err } - ctx.SetTaskStatus(currentTask.Key, CompletedStatus) + d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus) input = deepCloneValue(output) idx, currentTask = tasks.Next(idx) } @@ -110,7 +105,7 @@ func (d *DoTaskRunner) executeTasks(input interface{}, tasks *model.TaskList) (o func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (bool, error) { if task.GetBase().If != nil { - output, err := expr.TraverseAndEvaluate(task.GetBase().If.String(), input) + output, err := expr.TraverseAndEvaluate(task.GetBase().If.String(), input, d.TaskSupport.GetContext()) if err != nil { return false, model.NewErrExpression(err, task.Key) } @@ -140,6 +135,10 @@ func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model return nil, err } + if err = d.processTaskExport(task, output, taskName); err != nil { + return nil, err + } + return output, nil } @@ -153,7 +152,7 @@ func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interfac return nil, err } - if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName); err != nil { + if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, d.TaskSupport.GetContext()); err != nil { return nil, err } @@ -166,7 +165,7 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf return taskOutput, nil } - if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName); err != nil { + if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, d.TaskSupport.GetContext()); err != nil { return nil, err } @@ -176,3 +175,22 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf return output, nil } + +func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interface{}, taskName string) (err error) { + if task.Export == nil { + return nil + } + + output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, d.TaskSupport.GetContext()) + if err != nil { + return err + } + + if err = validateSchema(output, task.Export.Schema, taskName); err != nil { + return nil + } + + d.TaskSupport.SetWorkflowInstanceCtx(output) + + return nil +} diff --git a/impl/task_runner_raise_test.go b/impl/task_runner_raise_test.go index 3527283..1f0af1a 100644 --- a/impl/task_runner_raise_test.go +++ b/impl/task_runner_raise_test.go @@ -39,7 +39,7 @@ func TestRaiseTaskRunner_WithDefinedError(t *testing.T) { }, } - runner, err := NewRaiseTaskRunner("task_raise_defined", raiseTask, nil) + runner, err := NewRaiseTaskRunner("task_raise_defined", raiseTask, newTaskSupport()) assert.NoError(t, err) output, err := runner.Run(input) @@ -70,7 +70,7 @@ func TestRaiseTaskRunner_WithReferencedError(t *testing.T) { }, } - runner, err := NewRaiseTaskRunner("task_raise_ref", raiseTask, nil) + runner, err := NewRaiseTaskRunner("task_raise_ref", raiseTask, newTaskSupport()) assert.Error(t, err) assert.Nil(t, runner) } @@ -93,7 +93,7 @@ func TestRaiseTaskRunner_TimeoutErrorWithExpression(t *testing.T) { }, } - runner, err := NewRaiseTaskRunner("task_raise_timeout_expr", raiseTask, nil) + runner, err := NewRaiseTaskRunner("task_raise_timeout_expr", raiseTask, newTaskSupport()) assert.NoError(t, err) output, err := runner.Run(input) diff --git a/impl/task_runner_test.go b/impl/task_runner_test.go index c5a76d7..4f6ab28 100644 --- a/impl/task_runner_test.go +++ b/impl/task_runner_test.go @@ -15,6 +15,8 @@ package impl import ( + "context" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "os" "path/filepath" "testing" @@ -24,6 +26,42 @@ import ( "github.com/stretchr/testify/assert" ) +type taskSupportOpts func(*workflowRunnerImpl) + +// newTaskSupport returns an instance of TaskSupport for test purposes +func newTaskSupport(opts ...taskSupportOpts) TaskSupport { + ts := &workflowRunnerImpl{ + Workflow: nil, + Context: context.TODO(), + RunnerCtx: nil, + } + + // Apply each functional option to ts + for _, opt := range opts { + opt(ts) + } + + return ts +} + +func withWorkflow(wf *model.Workflow) taskSupportOpts { + return func(ts *workflowRunnerImpl) { + ts.Workflow = wf + } +} + +func withContext(ctx context.Context) taskSupportOpts { + return func(ts *workflowRunnerImpl) { + ts.Context = ctx + } +} + +func withRunnerCtx(workflowContext ctx.WorkflowContext) taskSupportOpts { + return func(ts *workflowRunnerImpl) { + ts.RunnerCtx = workflowContext + } +} + // runWorkflowTest is a reusable test function for workflows func runWorkflowTest(t *testing.T, workflowPath string, input, expectedOutput map[string]interface{}) { // Run the workflow @@ -50,7 +88,8 @@ func runWorkflow(t *testing.T, workflowPath string, input, expectedOutput map[st assert.NoError(t, err, "Failed to parse workflow YAML") // Initialize the workflow runner - runner := NewDefaultRunner(workflow) + runner, err := NewDefaultRunner(workflow) + assert.NoError(t, err) // Run the workflow output, err = runner.Run(input) @@ -151,7 +190,8 @@ func TestWorkflowRunner_Run_YAML_WithSchemaValidation(t *testing.T) { assert.NoError(t, err, "Failed to read workflow YAML file") workflow, err := parser.FromYAMLSource(yamlBytes) assert.NoError(t, err, "Failed to parse workflow YAML") - runner := NewDefaultRunner(workflow) + runner, err := NewDefaultRunner(workflow) + assert.NoError(t, err) _, err = runner.Run(input) assert.Error(t, err, "Expected validation error for invalid input") assert.Contains(t, err.Error(), "JSON schema validation failed") @@ -178,7 +218,8 @@ func TestWorkflowRunner_Run_YAML_WithSchemaValidation(t *testing.T) { assert.NoError(t, err, "Failed to read workflow YAML file") workflow, err := parser.FromYAMLSource(yamlBytes) assert.NoError(t, err, "Failed to parse workflow YAML") - runner := NewDefaultRunner(workflow) + runner, err := NewDefaultRunner(workflow) + assert.NoError(t, err) _, err = runner.Run(input) assert.Error(t, err, "Expected validation error for invalid task input") assert.Contains(t, err.Error(), "JSON schema validation failed") @@ -205,7 +246,8 @@ func TestWorkflowRunner_Run_YAML_WithSchemaValidation(t *testing.T) { assert.NoError(t, err, "Failed to read workflow YAML file") workflow, err := parser.FromYAMLSource(yamlBytes) assert.NoError(t, err, "Failed to parse workflow YAML") - runner := NewDefaultRunner(workflow) + runner, err := NewDefaultRunner(workflow) + assert.NoError(t, err) _, err = runner.Run(input) assert.Error(t, err, "Expected validation error for invalid task output") assert.Contains(t, err.Error(), "JSON schema validation failed") diff --git a/impl/task_set_test.go b/impl/task_set_test.go index 48ca18b..c1d5534 100644 --- a/impl/task_set_test.go +++ b/impl/task_set_test.go @@ -45,7 +45,7 @@ func TestSetTaskExecutor_Exec(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task1", setTask) + executor, err := NewSetTaskRunner("task1", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -79,7 +79,7 @@ func TestSetTaskExecutor_StaticValues(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_static", setTask) + executor, err := NewSetTaskRunner("task_static", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -109,7 +109,7 @@ func TestSetTaskExecutor_RuntimeExpressions(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_runtime_expr", setTask) + executor, err := NewSetTaskRunner("task_runtime_expr", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -141,7 +141,7 @@ func TestSetTaskExecutor_NestedStructures(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_nested_structures", setTask) + executor, err := NewSetTaskRunner("task_nested_structures", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -176,7 +176,7 @@ func TestSetTaskExecutor_StaticAndDynamicValues(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_static_dynamic", setTask) + executor, err := NewSetTaskRunner("task_static_dynamic", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -201,7 +201,7 @@ func TestSetTaskExecutor_MissingInputData(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_missing_input", setTask) + executor, err := NewSetTaskRunner("task_missing_input", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -220,7 +220,7 @@ func TestSetTaskExecutor_ExpressionsWithFunctions(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_expr_functions", setTask) + executor, err := NewSetTaskRunner("task_expr_functions", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -246,7 +246,7 @@ func TestSetTaskExecutor_ConditionalExpressions(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_conditional_expr", setTask) + executor, err := NewSetTaskRunner("task_conditional_expr", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -273,7 +273,7 @@ func TestSetTaskExecutor_ArrayDynamicIndex(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_array_indexing", setTask) + executor, err := NewSetTaskRunner("task_array_indexing", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -299,7 +299,7 @@ func TestSetTaskExecutor_NestedConditionalLogic(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_nested_condition", setTask) + executor, err := NewSetTaskRunner("task_nested_condition", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -323,7 +323,7 @@ func TestSetTaskExecutor_DefaultValues(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_default_values", setTask) + executor, err := NewSetTaskRunner("task_default_values", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -363,7 +363,7 @@ func TestSetTaskExecutor_ComplexNestedStructures(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_complex_nested", setTask) + executor, err := NewSetTaskRunner("task_complex_nested", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) @@ -399,7 +399,7 @@ func TestSetTaskExecutor_MultipleExpressions(t *testing.T) { }, } - executor, err := NewSetTaskRunner("task_multiple_expr", setTask) + executor, err := NewSetTaskRunner("task_multiple_expr", setTask, newTaskSupport()) assert.NoError(t, err) output, err := executor.Run(input) diff --git a/impl/utils.go b/impl/utils.go index 2cdf952..796c64d 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -15,7 +15,8 @@ package impl import ( - "github.com/serverlessworkflow/sdk-go/v3/expr" + "context" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -51,31 +52,13 @@ func validateSchema(data interface{}, schema *model.Schema, taskName string) err return nil } -func traverseAndEvaluate(runtimeExpr *model.ObjectOrRuntimeExpr, input interface{}, taskName string) (output interface{}, err error) { +func traverseAndEvaluate(runtimeExpr *model.ObjectOrRuntimeExpr, input interface{}, taskName string, wfCtx context.Context) (output interface{}, err error) { if runtimeExpr == nil { return input, nil } - output, err = expr.TraverseAndEvaluate(runtimeExpr.AsStringOrMap(), input) + output, err = expr.TraverseAndEvaluate(runtimeExpr.AsStringOrMap(), input, wfCtx) if err != nil { return nil, model.NewErrExpression(err, taskName) } return output, nil } - -func processIO(data interface{}, schema *model.Schema, transformation *model.ObjectOrRuntimeExpr, taskName string) (interface{}, error) { - if schema != nil { - if err := validateSchema(data, schema, taskName); err != nil { - return nil, err - } - } - - if transformation != nil { - transformed, err := traverseAndEvaluate(transformation, data, taskName) - if err != nil { - return nil, err - } - return transformed, nil - } - - return data, nil -} diff --git a/model/objects.go b/model/objects.go index d79ac55..2bb8dd9 100644 --- a/model/objects.go +++ b/model/objects.go @@ -73,6 +73,12 @@ type ObjectOrRuntimeExpr struct { Value interface{} `json:"-" validate:"object_or_runtime_expr"` // Custom validation tag. } +func NewObjectOrRuntimeExpr(value interface{}) *ObjectOrRuntimeExpr { + return &ObjectOrRuntimeExpr{ + Value: value, + } +} + func (o *ObjectOrRuntimeExpr) String() string { return fmt.Sprintf("%v", o.Value) } diff --git a/model/runtime_expression.go b/model/runtime_expression.go index 6a056cb..ae04e46 100644 --- a/model/runtime_expression.go +++ b/model/runtime_expression.go @@ -17,8 +17,8 @@ package model import ( "encoding/json" "fmt" - - "github.com/serverlessworkflow/sdk-go/v3/expr" + "github.com/itchyny/gojq" + "strings" ) // RuntimeExpression represents a runtime expression. @@ -34,9 +34,34 @@ func NewExpr(runtimeExpression string) *RuntimeExpression { return &RuntimeExpression{Value: runtimeExpression} } +// IsStrictExpr returns true if the string is enclosed in `${ }` +func IsStrictExpr(expression string) bool { + return strings.HasPrefix(expression, "${") && strings.HasSuffix(expression, "}") +} + +// SanitizeExpr processes the expression to ensure it's ready for evaluation +// It removes `${}` if present and replaces single quotes with double quotes +func SanitizeExpr(expression string) string { + // Remove `${}` enclosure if present + if IsStrictExpr(expression) { + expression = strings.TrimSpace(expression[2 : len(expression)-1]) + } + + // Replace single quotes with double quotes + expression = strings.ReplaceAll(expression, "'", "\"") + + return expression +} + +func IsValidExpr(expression string) bool { + expression = SanitizeExpr(expression) + _, err := gojq.Parse(expression) + return err == nil +} + // IsValid checks if the RuntimeExpression value is valid, handling both with and without `${}`. func (r *RuntimeExpression) IsValid() bool { - return expr.IsValid(r.Value) + return IsValidExpr(r.Value) } // UnmarshalJSON implements custom unmarshalling for RuntimeExpression. diff --git a/model/runtime_expression_test.go b/model/runtime_expression_test.go index 296e1de..770af70 100644 --- a/model/runtime_expression_test.go +++ b/model/runtime_expression_test.go @@ -68,3 +68,152 @@ func TestRuntimeExpressionUnmarshalJSON(t *testing.T) { type RuntimeExpressionAcme struct { Expression RuntimeExpression `json:"expression"` } + +func TestIsStrictExpr(t *testing.T) { + tests := []struct { + name string + expression string + want bool + }{ + { + name: "StrictExpr with braces", + expression: "${.some.path}", + want: true, + }, + { + name: "Missing closing brace", + expression: "${.some.path", + want: false, + }, + { + name: "Missing opening brace", + expression: ".some.path}", + want: false, + }, + { + name: "Empty string", + expression: "", + want: false, + }, + { + name: "No braces at all", + expression: ".some.path", + want: false, + }, + { + name: "With spaces but still correct", + expression: "${ .some.path }", + want: true, + }, + { + name: "Only braces", + expression: "${}", + want: true, // Technically matches prefix+suffix + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := IsStrictExpr(tc.expression) + if got != tc.want { + t.Errorf("IsStrictExpr(%q) = %v, want %v", tc.expression, got, tc.want) + } + }) + } +} + +func TestSanitize(t *testing.T) { + tests := []struct { + name string + expression string + want string + }{ + { + name: "Remove braces and replace single quotes", + expression: "${ 'some.path' }", + want: `"some.path"`, + }, + { + name: "Already sanitized string, no braces", + expression: ".some.path", + want: ".some.path", + }, + { + name: "Multiple single quotes", + expression: "${ 'foo' + 'bar' }", + want: `"foo" + "bar"`, + }, + { + name: "Only braces with spaces", + expression: "${ }", + want: "", + }, + { + name: "No braces, just single quotes to be replaced", + expression: "'some.path'", + want: `"some.path"`, + }, + { + name: "Nothing to sanitize", + expression: "", + want: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := SanitizeExpr(tc.expression) + if got != tc.want { + t.Errorf("Sanitize(%q) = %q, want %q", tc.expression, got, tc.want) + } + }) + } +} + +func TestIsValid(t *testing.T) { + tests := []struct { + name string + expression string + want bool + }{ + { + name: "Valid expression - simple path", + expression: "${ .foo }", + want: true, + }, + { + name: "Valid expression - array slice", + expression: "${ .arr[0] }", + want: true, + }, + { + name: "Invalid syntax", + expression: "${ .foo( }", + want: false, + }, + { + name: "No braces but valid JQ (directly provided)", + expression: ".bar", + want: true, + }, + { + name: "Empty expression", + expression: "", + want: true, // empty is parseable but yields an empty query + }, + { + name: "Invalid bracket usage", + expression: "${ .arr[ }", + want: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := IsValidExpr(tc.expression) + if got != tc.want { + t.Errorf("IsValid(%q) = %v, want %v", tc.expression, got, tc.want) + } + }) + } +} diff --git a/model/workflow.go b/model/workflow.go index 313a9e5..15dba7e 100644 --- a/model/workflow.go +++ b/model/workflow.go @@ -31,6 +31,20 @@ type Workflow struct { Schedule *Schedule `json:"schedule,omitempty" yaml:"schedule,omitempty"` } +// AsMap converts the Workflow struct into a JSON Map object. +func (w *Workflow) AsMap() (map[string]interface{}, error) { + jsonBytes, err := json.Marshal(w) + if err != nil { + return nil, err + } + + var m map[string]interface{} + if err = json.Unmarshal(jsonBytes, &m); err != nil { + return nil, err + } + return m, nil +} + func (w *Workflow) MarshalYAML() (interface{}, error) { // Create a map to hold fields data := map[string]interface{}{ From 598a8e0733d196c1df607aa373aa609c3abc0f66 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Thu, 27 Mar 2025 16:26:43 -0400 Subject: [PATCH 2/4] Fix #229 - Implement For task and refactor jq expr into context Signed-off-by: Ricardo Zanini --- README.md | 2 +- go.mod | 2 + go.sum | 4 + impl/ctx/context.go | 281 +++++++++++++++--- impl/expr/expr.go | 37 ++- impl/json_pointer.go | 77 +++++ .../json_pointer_test.go | 84 +++--- impl/runner.go | 71 ++++- impl/{task_runner_test.go => runner_test.go} | 57 +++- impl/task_runner.go | 254 ++-------------- impl/task_runner_do.go | 32 +- impl/task_runner_for.go | 135 +++++++++ impl/task_runner_raise.go | 105 +++++++ impl/task_runner_raise_test.go | 13 +- impl/task_runner_set.go | 56 ++++ impl/testdata/for_nested_loops.yaml | 21 ++ impl/testdata/for_sum_numbers.yaml | 16 + impl/testdata/raise_inline.yaml | 2 +- impl/utils.go | 14 + model/errors.go | 63 +--- 20 files changed, 903 insertions(+), 423 deletions(-) create mode 100644 impl/json_pointer.go rename model/errors_test.go => impl/json_pointer_test.go (53%) rename impl/{task_runner_test.go => runner_test.go} (89%) create mode 100644 impl/task_runner_for.go create mode 100644 impl/task_runner_raise.go create mode 100644 impl/task_runner_set.go create mode 100644 impl/testdata/for_nested_loops.yaml create mode 100644 impl/testdata/for_sum_numbers.yaml diff --git a/README.md b/README.md index 9daabf0..f05e54c 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ The table below lists the current state of this implementation. This table is a | Task Call | ❌ | | Task Do | ✅ | | Task Emit | ❌ | -| Task For | ❌ | +| Task For | ✅ | | Task Fork | ❌ | | Task Listen | ❌ | | Task Raise | ✅ | diff --git a/go.mod b/go.mod index 32f8859..e7947a8 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,11 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/google/go-cmp v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/relvacode/iso8601 v1.6.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect diff --git a/go.sum b/go.sum index 80ed15c..e6e3d38 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1 github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg= github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY= github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= @@ -23,6 +25,8 @@ github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjS github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU= +github.com/relvacode/iso8601 v1.6.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= diff --git a/impl/ctx/context.go b/impl/ctx/context.go index d5aa920..1f0d716 100644 --- a/impl/ctx/context.go +++ b/impl/ctx/context.go @@ -16,9 +16,13 @@ package ctx import ( "context" + "encoding/json" "errors" + "fmt" + "github.com/google/uuid" "github.com/serverlessworkflow/sdk-go/v3/model" "sync" + "time" ) var ErrWorkflowContextNotFound = errors.New("workflow context not found") @@ -29,15 +33,23 @@ type ctxKey string const ( runnerCtxKey ctxKey = "wfRunnerContext" - varsContext = "$context" - varsInput = "$input" - varsOutput = "$output" - varsWorkflow = "$workflow" + + varsContext = "$context" + varsInput = "$input" + varsOutput = "$output" + varsWorkflow = "$workflow" + varsRuntime = "$runtime" + varsTask = "$task" + + // TODO: script during the release to update this value programmatically + runtimeVersion = "v3.1.0" + runtimeName = "CNCF Serverless Workflow Specification Go SDK" ) type WorkflowContext interface { + SetStartedAt(t time.Time) SetStatus(status StatusPhase) - SetTaskStatus(task string, status StatusPhase) + SetRawInput(input interface{}) SetInstanceCtx(value interface{}) GetInstanceCtx() interface{} SetInput(input interface{}) @@ -45,18 +57,32 @@ type WorkflowContext interface { SetOutput(output interface{}) GetOutput() interface{} GetOutputAsMap() map[string]interface{} - AsJQVars() map[string]interface{} + GetVars() map[string]interface{} + SetTaskStatus(task string, status StatusPhase) + SetTaskRawInput(input interface{}) + SetTaskRawOutput(output interface{}) + SetTaskDef(task model.Task) error + SetTaskStartedAt(startedAt time.Time) + SetTaskName(name string) + SetTaskReference(ref string) + GetTaskReference() string + ClearTaskContext() + SetLocalExprVars(vars map[string]interface{}) + AddLocalExprVars(vars map[string]interface{}) + RemoveLocalExprVars(keys ...string) } // workflowContext holds the necessary data for the workflow execution within the instance. type workflowContext struct { - mu sync.Mutex - input interface{} // $input can hold any type - output interface{} // $output can hold any type - context map[string]interface{} // Holds `$context` as the key - definition map[string]interface{} // $workflow representation in the context - StatusPhase []StatusPhaseLog - TasksStatusPhase map[string][]StatusPhaseLog + mu sync.Mutex + input interface{} // $input can hold any type + output interface{} // $output can hold any type + context map[string]interface{} // Holds `$context` as the key + workflowDescriptor map[string]interface{} // $workflow representation in the context + taskDescriptor map[string]interface{} // $task representation in the context + localExprVars map[string]interface{} // Local expression variables defined in a given task or private context. E.g. a For task $item. + StatusPhase []StatusPhaseLog + TasksStatusPhase map[string][]StatusPhaseLog } func NewWorkflowContext(workflow *model.Workflow) (WorkflowContext, error) { @@ -65,19 +91,110 @@ func NewWorkflowContext(workflow *model.Workflow) (WorkflowContext, error) { if err != nil { return nil, err } - - workflowCtx.definition = workflowDef + workflowCtx.taskDescriptor = map[string]interface{}{} + workflowCtx.workflowDescriptor = map[string]interface{}{ + varsWorkflow: map[string]interface{}{ + "id": uuid.NewString(), + "definition": workflowDef, + }, + } workflowCtx.SetStatus(PendingStatus) return workflowCtx, nil } -func (ctx *workflowContext) AsJQVars() map[string]interface{} { +// WithWorkflowContext adds the workflowContext to a parent context +func WithWorkflowContext(parent context.Context, wfCtx WorkflowContext) context.Context { + return context.WithValue(parent, runnerCtxKey, wfCtx) +} + +// GetWorkflowContext retrieves the workflowContext from a context +func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) { + wfCtx, ok := ctx.Value(runnerCtxKey).(*workflowContext) + if !ok { + return nil, ErrWorkflowContextNotFound + } + return wfCtx, nil +} + +func (ctx *workflowContext) SetStartedAt(t time.Time) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + wf, ok := ctx.workflowDescriptor[varsWorkflow].(map[string]interface{}) + if !ok { + wf = make(map[string]interface{}) + ctx.workflowDescriptor[varsWorkflow] = wf + } + + startedAt, ok := wf["startedAt"].(map[string]interface{}) + if !ok { + startedAt = make(map[string]interface{}) + wf["startedAt"] = startedAt + } + + startedAt["iso8601"] = t.UTC().Format(time.RFC3339) +} + +func (ctx *workflowContext) SetRawInput(input interface{}) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + // Ensure the outer "workflow" map + wf, ok := ctx.workflowDescriptor[varsWorkflow].(map[string]interface{}) + if !ok { + wf = make(map[string]interface{}) + ctx.workflowDescriptor[varsWorkflow] = wf + } + + // Store the input + wf["input"] = input +} + +func (ctx *workflowContext) AddLocalExprVars(vars map[string]interface{}) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + if ctx.localExprVars == nil { + ctx.localExprVars = map[string]interface{}{} + } + for k, v := range vars { + ctx.localExprVars[k] = v + } +} + +func (ctx *workflowContext) RemoveLocalExprVars(keys ...string) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + if ctx.localExprVars == nil { + return + } + + for _, k := range keys { + delete(ctx.localExprVars, k) + } +} + +func (ctx *workflowContext) SetLocalExprVars(vars map[string]interface{}) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + ctx.localExprVars = vars +} + +func (ctx *workflowContext) GetVars() map[string]interface{} { vars := make(map[string]interface{}) vars[varsInput] = ctx.GetInput() vars[varsOutput] = ctx.GetOutput() vars[varsContext] = ctx.GetInstanceCtx() - vars[varsOutput] = ctx.definition + vars[varsTask] = ctx.taskDescriptor[varsTask] + vars[varsWorkflow] = ctx.workflowDescriptor[varsWorkflow] + vars[varsRuntime] = map[string]interface{}{ + "name": runtimeName, + "version": runtimeVersion, + } + for varName, varValue := range ctx.localExprVars { + vars[varName] = varValue + } return vars } @@ -90,15 +207,6 @@ func (ctx *workflowContext) SetStatus(status StatusPhase) { ctx.StatusPhase = append(ctx.StatusPhase, NewStatusPhaseLog(status)) } -func (ctx *workflowContext) SetTaskStatus(task string, status StatusPhase) { - ctx.mu.Lock() - defer ctx.mu.Unlock() - if ctx.TasksStatusPhase == nil { - ctx.TasksStatusPhase = map[string][]StatusPhaseLog{} - } - ctx.TasksStatusPhase[task] = append(ctx.TasksStatusPhase[task], NewStatusPhaseLog(status)) -} - // SetInstanceCtx safely sets the `$context` value func (ctx *workflowContext) SetInstanceCtx(value interface{}) { ctx.mu.Lock() @@ -179,16 +287,121 @@ func (ctx *workflowContext) GetOutputAsMap() map[string]interface{} { } } -// WithWorkflowContext adds the workflowContext to a parent context -func WithWorkflowContext(parent context.Context, wfCtx WorkflowContext) context.Context { - return context.WithValue(parent, runnerCtxKey, wfCtx) +func (ctx *workflowContext) SetTaskStatus(task string, status StatusPhase) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + if ctx.TasksStatusPhase == nil { + ctx.TasksStatusPhase = map[string][]StatusPhaseLog{} + } + ctx.TasksStatusPhase[task] = append(ctx.TasksStatusPhase[task], NewStatusPhaseLog(status)) } -// GetWorkflowContext retrieves the workflowContext from a context -func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) { - wfCtx, ok := ctx.Value(runnerCtxKey).(*workflowContext) +func (ctx *workflowContext) SetTaskRawInput(input interface{}) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) if !ok { - return nil, ErrWorkflowContextNotFound + task = make(map[string]interface{}) + ctx.taskDescriptor[varsTask] = task } - return wfCtx, nil + + task["input"] = input +} + +func (ctx *workflowContext) SetTaskRawOutput(output interface{}) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) + if !ok { + task = make(map[string]interface{}) + ctx.taskDescriptor[varsTask] = task + } + + task["output"] = output +} + +func (ctx *workflowContext) SetTaskDef(task model.Task) error { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + if task == nil { + return errors.New("SetTaskDef called with nil model.Task") + } + + defBytes, err := json.Marshal(task) + if err != nil { + return fmt.Errorf("failed to marshal task: %w", err) + } + + var defMap map[string]interface{} + if err := json.Unmarshal(defBytes, &defMap); err != nil { + return fmt.Errorf("failed to unmarshal task into map: %w", err) + } + + taskMap, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) + if !ok { + taskMap = make(map[string]interface{}) + ctx.taskDescriptor[varsTask] = taskMap + } + + taskMap["definition"] = defMap + + return nil +} + +func (ctx *workflowContext) SetTaskStartedAt(startedAt time.Time) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) + if !ok { + task = make(map[string]interface{}) + ctx.taskDescriptor[varsTask] = task + } + + task["startedAt"] = startedAt.UTC().Format(time.RFC3339) +} + +func (ctx *workflowContext) SetTaskName(name string) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) + if !ok { + task = make(map[string]interface{}) + ctx.taskDescriptor[varsTask] = task + } + + task["name"] = name +} + +func (ctx *workflowContext) SetTaskReference(ref string) { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) + if !ok { + task = make(map[string]interface{}) + ctx.taskDescriptor[varsTask] = task + } + + task["reference"] = ref +} + +func (ctx *workflowContext) GetTaskReference() string { + ctx.mu.Lock() + defer ctx.mu.Unlock() + task, ok := ctx.taskDescriptor[varsTask].(map[string]interface{}) + if !ok { + return "" + } + return task["reference"].(string) +} + +func (ctx *workflowContext) ClearTaskContext() { + ctx.mu.Lock() + defer ctx.mu.Unlock() + ctx.taskDescriptor[varsTask] = make(map[string]interface{}) } diff --git a/impl/expr/expr.go b/impl/expr/expr.go index 4d48589..03d558e 100644 --- a/impl/expr/expr.go +++ b/impl/expr/expr.go @@ -74,46 +74,43 @@ func traverseAndEvaluate(node interface{}, input interface{}, variables map[stri // evaluateJQExpression evaluates a jq expression against a given JSON input func evaluateJQExpression(expression string, input interface{}, variables map[string]interface{}) (interface{}, error) { - // Parse the sanitized jq expression query, err := gojq.Parse(expression) if err != nil { return nil, fmt.Errorf("failed to parse jq expression: %s, error: %w", expression, err) } - code, err := gojq.Compile(query, gojq.WithVariables(getVariablesName(variables))) + // Get the variable names & values in a single pass: + names, values := getVariableNamesAndValues(variables) + + code, err := gojq.Compile(query, gojq.WithVariables(names)) if err != nil { return nil, fmt.Errorf("failed to compile jq expression: %s, error: %w", expression, err) } - // Compile and evaluate the expression - iter := code.Run(input, getVariablesValue(variables)...) + iter := code.Run(input, values...) result, ok := iter.Next() if !ok { return nil, errors.New("no result from jq evaluation") } - // Check if an error occurred during evaluation - if err, isErr := result.(error); isErr { - return nil, fmt.Errorf("jq evaluation error: %w", err) + // If there's an error from the jq engine, report it + if errVal, isErr := result.(error); isErr { + return nil, fmt.Errorf("jq evaluation error: %w", errVal) } return result, nil } -func getVariablesName(variables map[string]interface{}) []string { - result := make([]string, 0, len(variables)) - for variable := range variables { - result = append(result, variable) - } - return result -} +// getVariableNamesAndValues constructs two slices, where 'names[i]' matches 'values[i]'. +func getVariableNamesAndValues(vars map[string]interface{}) ([]string, []interface{}) { + names := make([]string, 0, len(vars)) + values := make([]interface{}, 0, len(vars)) -func getVariablesValue(variables map[string]interface{}) []interface{} { - result := make([]interface{}, 0, len(variables)) - for _, variable := range variables { - result = append(result, variable) + for k, v := range vars { + names = append(names, k) + values = append(values, v) } - return result + return names, values } func mergeContextInVars(nodeCtx context.Context, variables map[string]interface{}) error { @@ -128,7 +125,7 @@ func mergeContextInVars(nodeCtx context.Context, variables map[string]interface{ return err } // merge - for k, val := range wfCtx.AsJQVars() { + for k, val := range wfCtx.GetVars() { variables[k] = val } diff --git a/impl/json_pointer.go b/impl/json_pointer.go new file mode 100644 index 0000000..4d276ff --- /dev/null +++ b/impl/json_pointer.go @@ -0,0 +1,77 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "encoding/json" + "fmt" + "github.com/serverlessworkflow/sdk-go/v3/model" + "reflect" + "strings" +) + +func findJsonPointer(data interface{}, target string, path string) (string, bool) { + switch node := data.(type) { + case map[string]interface{}: + for key, value := range node { + newPath := fmt.Sprintf("%s/%s", path, key) + if key == target { + return newPath, true + } + if result, found := findJsonPointer(value, target, newPath); found { + return result, true + } + } + case []interface{}: + for i, item := range node { + newPath := fmt.Sprintf("%s/%d", path, i) + if result, found := findJsonPointer(item, target, newPath); found { + return result, true + } + } + } + return "", false +} + +// GenerateJSONPointer Function to generate JSON Pointer from a Workflow reference +func GenerateJSONPointer(workflow *model.Workflow, targetNode interface{}) (string, error) { + // Convert struct to JSON + jsonData, err := json.Marshal(workflow) + if err != nil { + return "", fmt.Errorf("error marshalling to JSON: %w", err) + } + + // Convert JSON to a generic map for traversal + var jsonMap map[string]interface{} + if err := json.Unmarshal(jsonData, &jsonMap); err != nil { + return "", fmt.Errorf("error unmarshalling JSON: %w", err) + } + + transformedNode := "" + switch node := targetNode.(type) { + case string: + transformedNode = node + default: + transformedNode = strings.ToLower(reflect.TypeOf(targetNode).Name()) + } + + // Search for the target node + jsonPointer, found := findJsonPointer(jsonMap, transformedNode, "") + if !found { + return "", fmt.Errorf("node '%s' not found", targetNode) + } + + return jsonPointer, nil +} diff --git a/model/errors_test.go b/impl/json_pointer_test.go similarity index 53% rename from model/errors_test.go rename to impl/json_pointer_test.go index 12a00fb..76077bc 100644 --- a/model/errors_test.go +++ b/impl/json_pointer_test.go @@ -12,21 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -package model +package impl import ( - "testing" - + "github.com/serverlessworkflow/sdk-go/v3/model" "github.com/stretchr/testify/assert" + "testing" ) // TestGenerateJSONPointer_SimpleTask tests a simple workflow task. func TestGenerateJSONPointer_SimpleTask(t *testing.T) { - workflow := &Workflow{ - Document: Document{Name: "simple-workflow"}, - Do: &TaskList{ - &TaskItem{Key: "task1", Task: &SetTask{Set: map[string]interface{}{"value": 10}}}, - &TaskItem{Key: "task2", Task: &SetTask{Set: map[string]interface{}{"double": "${ .value * 2 }"}}}, + workflow := &model.Workflow{ + Document: model.Document{Name: "simple-workflow"}, + Do: &model.TaskList{ + &model.TaskItem{Key: "task1", Task: &model.SetTask{Set: map[string]interface{}{"value": 10}}}, + &model.TaskItem{Key: "task2", Task: &model.SetTask{Set: map[string]interface{}{"double": "${ .value * 2 }"}}}, }, } @@ -37,11 +37,11 @@ func TestGenerateJSONPointer_SimpleTask(t *testing.T) { // TestGenerateJSONPointer_SimpleTask tests a simple workflow task. func TestGenerateJSONPointer_Document(t *testing.T) { - workflow := &Workflow{ - Document: Document{Name: "simple-workflow"}, - Do: &TaskList{ - &TaskItem{Key: "task1", Task: &SetTask{Set: map[string]interface{}{"value": 10}}}, - &TaskItem{Key: "task2", Task: &SetTask{Set: map[string]interface{}{"double": "${ .value * 2 }"}}}, + workflow := &model.Workflow{ + Document: model.Document{Name: "simple-workflow"}, + Do: &model.TaskList{ + &model.TaskItem{Key: "task1", Task: &model.SetTask{Set: map[string]interface{}{"value": 10}}}, + &model.TaskItem{Key: "task2", Task: &model.SetTask{Set: map[string]interface{}{"double": "${ .value * 2 }"}}}, }, } @@ -51,17 +51,17 @@ func TestGenerateJSONPointer_Document(t *testing.T) { } func TestGenerateJSONPointer_ForkTask(t *testing.T) { - workflow := &Workflow{ - Document: Document{Name: "fork-example"}, - Do: &TaskList{ - &TaskItem{ + workflow := &model.Workflow{ + Document: model.Document{Name: "fork-example"}, + Do: &model.TaskList{ + &model.TaskItem{ Key: "raiseAlarm", - Task: &ForkTask{ - Fork: ForkTaskConfiguration{ + Task: &model.ForkTask{ + Fork: model.ForkTaskConfiguration{ Compete: true, - Branches: &TaskList{ - {Key: "callNurse", Task: &CallHTTP{Call: "http", With: HTTPArguments{Method: "put", Endpoint: NewEndpoint("https://hospital.com/api/alert/nurses")}}}, - {Key: "callDoctor", Task: &CallHTTP{Call: "http", With: HTTPArguments{Method: "put", Endpoint: NewEndpoint("https://hospital.com/api/alert/doctor")}}}, + Branches: &model.TaskList{ + {Key: "callNurse", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/nurses")}}}, + {Key: "callDoctor", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/doctor")}}}, }, }, }, @@ -76,23 +76,23 @@ func TestGenerateJSONPointer_ForkTask(t *testing.T) { // TestGenerateJSONPointer_DeepNestedTask tests multiple nested task levels. func TestGenerateJSONPointer_DeepNestedTask(t *testing.T) { - workflow := &Workflow{ - Document: Document{Name: "deep-nested"}, - Do: &TaskList{ - &TaskItem{ + workflow := &model.Workflow{ + Document: model.Document{Name: "deep-nested"}, + Do: &model.TaskList{ + &model.TaskItem{ Key: "step1", - Task: &ForkTask{ - Fork: ForkTaskConfiguration{ + Task: &model.ForkTask{ + Fork: model.ForkTaskConfiguration{ Compete: false, - Branches: &TaskList{ + Branches: &model.TaskList{ { Key: "branchA", - Task: &ForkTask{ - Fork: ForkTaskConfiguration{ - Branches: &TaskList{ + Task: &model.ForkTask{ + Fork: model.ForkTaskConfiguration{ + Branches: &model.TaskList{ { Key: "deepTask", - Task: &SetTask{Set: map[string]interface{}{"result": "done"}}, + Task: &model.SetTask{Set: map[string]interface{}{"result": "done"}}, }, }, }, @@ -112,10 +112,10 @@ func TestGenerateJSONPointer_DeepNestedTask(t *testing.T) { // TestGenerateJSONPointer_NonExistentTask checks for a task that doesn't exist. func TestGenerateJSONPointer_NonExistentTask(t *testing.T) { - workflow := &Workflow{ - Document: Document{Name: "nonexistent-test"}, - Do: &TaskList{ - &TaskItem{Key: "taskA", Task: &SetTask{Set: map[string]interface{}{"value": 5}}}, + workflow := &model.Workflow{ + Document: model.Document{Name: "nonexistent-test"}, + Do: &model.TaskList{ + &model.TaskItem{Key: "taskA", Task: &model.SetTask{Set: map[string]interface{}{"value": 5}}}, }, } @@ -125,11 +125,11 @@ func TestGenerateJSONPointer_NonExistentTask(t *testing.T) { // TestGenerateJSONPointer_MixedTaskTypes verifies a workflow with different task types. func TestGenerateJSONPointer_MixedTaskTypes(t *testing.T) { - workflow := &Workflow{ - Document: Document{Name: "mixed-tasks"}, - Do: &TaskList{ - &TaskItem{Key: "compute", Task: &SetTask{Set: map[string]interface{}{"result": 42}}}, - &TaskItem{Key: "notify", Task: &CallHTTP{Call: "http", With: HTTPArguments{Method: "post", Endpoint: NewEndpoint("https://api.notify.com")}}}, + workflow := &model.Workflow{ + Document: model.Document{Name: "mixed-tasks"}, + Do: &model.TaskList{ + &model.TaskItem{Key: "compute", Task: &model.SetTask{Set: map[string]interface{}{"result": 42}}}, + &model.TaskItem{Key: "notify", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "post", Endpoint: model.NewEndpoint("https://api.notify.com")}}}, }, } diff --git a/impl/runner.go b/impl/runner.go index f0cfd50..1c9ad8b 100644 --- a/impl/runner.go +++ b/impl/runner.go @@ -19,18 +19,13 @@ import ( "fmt" "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" + "time" ) var _ WorkflowRunner = &workflowRunnerImpl{} var _ TaskSupport = &workflowRunnerImpl{} -type TaskSupport interface { - SetTaskStatus(task string, status ctx.StatusPhase) - GetWorkflowDef() *model.Workflow - SetWorkflowInstanceCtx(value interface{}) - GetContext() context.Context -} - +// WorkflowRunner is the public API to run Workflows type WorkflowRunner interface { GetWorkflowDef() *model.Workflow Run(input interface{}) (output interface{}, err error) @@ -57,6 +52,51 @@ type workflowRunnerImpl struct { RunnerCtx ctx.WorkflowContext } +func (wr *workflowRunnerImpl) RemoveLocalExprVars(keys ...string) { + wr.RunnerCtx.RemoveLocalExprVars(keys...) +} + +func (wr *workflowRunnerImpl) AddLocalExprVars(vars map[string]interface{}) { + wr.RunnerCtx.AddLocalExprVars(vars) +} + +func (wr *workflowRunnerImpl) SetLocalExprVars(vars map[string]interface{}) { + wr.RunnerCtx.SetLocalExprVars(vars) +} + +func (wr *workflowRunnerImpl) SetTaskReferenceFromName(taskName string) error { + ref, err := GenerateJSONPointer(wr.Workflow, taskName) + if err != nil { + return err + } + wr.RunnerCtx.SetTaskReference(ref) + return nil +} + +func (wr *workflowRunnerImpl) GetTaskReference() string { + return wr.RunnerCtx.GetTaskReference() +} + +func (wr *workflowRunnerImpl) SetTaskRawInput(input interface{}) { + wr.RunnerCtx.SetTaskRawInput(input) +} + +func (wr *workflowRunnerImpl) SetTaskRawOutput(output interface{}) { + wr.RunnerCtx.SetTaskRawOutput(output) +} + +func (wr *workflowRunnerImpl) SetTaskDef(task model.Task) error { + return wr.RunnerCtx.SetTaskDef(task) +} + +func (wr *workflowRunnerImpl) SetTaskStartedAt(startedAt time.Time) { + wr.RunnerCtx.SetTaskStartedAt(startedAt) +} + +func (wr *workflowRunnerImpl) SetTaskName(name string) { + wr.RunnerCtx.SetTaskName(name) +} + func (wr *workflowRunnerImpl) GetContext() context.Context { return wr.Context } @@ -82,10 +122,12 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er defer func() { if err != nil { wr.RunnerCtx.SetStatus(ctx.FaultedStatus) - err = wr.wrapWorkflowError(err, "/") + err = wr.wrapWorkflowError(err) } }() + wr.RunnerCtx.SetRawInput(input) + // Process input if input, err = wr.processInput(input); err != nil { return nil, err @@ -98,11 +140,14 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er if err != nil { return nil, err } + wr.RunnerCtx.SetStartedAt(time.Now()) output, err = doRunner.Run(wr.RunnerCtx.GetInput()) if err != nil { return nil, err } + wr.RunnerCtx.ClearTaskContext() + // Process output if output, err = wr.processOutput(output); err != nil { return nil, err @@ -114,11 +159,15 @@ func (wr *workflowRunnerImpl) Run(input interface{}) (output interface{}, err er } // wrapWorkflowError ensures workflow errors have a proper instance reference. -func (wr *workflowRunnerImpl) wrapWorkflowError(err error, taskName string) error { +func (wr *workflowRunnerImpl) wrapWorkflowError(err error) error { + taskReference := wr.RunnerCtx.GetTaskReference() + if len(taskReference) == 0 { + taskReference = "/" + } if knownErr := model.AsError(err); knownErr != nil { - return knownErr.WithInstanceRef(wr.Workflow, taskName) + return knownErr.WithInstanceRef(wr.Workflow, taskReference) } - return model.NewErrRuntime(fmt.Errorf("workflow '%s', task '%s': %w", wr.Workflow.Document.Name, taskName, err), taskName) + return model.NewErrRuntime(fmt.Errorf("workflow '%s', task '%s': %w", wr.Workflow.Document.Name, taskReference, err), taskReference) } // processInput validates and transforms input if needed. diff --git a/impl/task_runner_test.go b/impl/runner_test.go similarity index 89% rename from impl/task_runner_test.go rename to impl/runner_test.go index 4f6ab28..981be36 100644 --- a/impl/task_runner_test.go +++ b/impl/runner_test.go @@ -16,24 +16,29 @@ package impl import ( "context" + "fmt" "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" - "os" - "path/filepath" - "testing" - "github.com/serverlessworkflow/sdk-go/v3/model" "github.com/serverlessworkflow/sdk-go/v3/parser" "github.com/stretchr/testify/assert" + "os" + "path/filepath" + "testing" ) type taskSupportOpts func(*workflowRunnerImpl) // newTaskSupport returns an instance of TaskSupport for test purposes func newTaskSupport(opts ...taskSupportOpts) TaskSupport { + wfCtx, err := ctx.NewWorkflowContext(&model.Workflow{}) + if err != nil { + panic(fmt.Errorf("failed to create workflow context within the test environment: %v", err)) + } + ts := &workflowRunnerImpl{ Workflow: nil, Context: context.TODO(), - RunnerCtx: nil, + RunnerCtx: wfCtx, } // Apply each functional option to ts @@ -308,9 +313,12 @@ func TestWorkflowRunner_Run_YAML_ControlFlow(t *testing.T) { func TestWorkflowRunner_Run_YAML_RaiseTasks(t *testing.T) { // TODO: add $workflow context to the expr processing - //t.Run("Raise Inline Error", func(t *testing.T) { - // runWorkflowTest(t, "./testdata/raise_inline.yaml", nil, nil) - //}) + t.Run("Raise Inline Error", func(t *testing.T) { + runWorkflowWithErr(t, "./testdata/raise_inline.yaml", nil, nil, func(err error) { + assert.Equal(t, model.ErrorTypeValidation, model.AsError(err).Type.String()) + assert.Equal(t, "Invalid input provided to workflow raise-inline", model.AsError(err).Detail.String()) + }) + }) t.Run("Raise Referenced Error", func(t *testing.T) { runWorkflowWithErr(t, "./testdata/raise_reusable.yaml", nil, nil, @@ -354,7 +362,6 @@ func TestWorkflowRunner_Run_YAML_RaiseTasks_ControlFlow(t *testing.T) { } func TestForTaskRunner_Run(t *testing.T) { - t.Skip("Skipping until the For task is implemented - missing JQ variables implementation") t.Run("Simple For with Colors", func(t *testing.T) { workflowPath := "./testdata/for_colors.yaml" input := map[string]interface{}{ @@ -362,8 +369,36 @@ func TestForTaskRunner_Run(t *testing.T) { } expectedOutput := map[string]interface{}{ "processed": map[string]interface{}{ - "colors": []string{"red", "green", "blue"}, - "indexed": []float64{0, 1, 2}, + "colors": []interface{}{"red", "green", "blue"}, + "indexes": []interface{}{0, 1, 2}, + }, + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) + + t.Run("SUM Numbers", func(t *testing.T) { + workflowPath := "./testdata/for_sum_numbers.yaml" + input := map[string]interface{}{ + "numbers": []int32{2, 3, 4}, + } + expectedOutput := map[string]interface{}{ + "result": interface{}(9), + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) + + t.Run("For Nested Loops", func(t *testing.T) { + workflowPath := "./testdata/for_nested_loops.yaml" + input := map[string]interface{}{ + "fruits": []interface{}{"apple", "banana"}, + "colors": []interface{}{"red", "green"}, + } + expectedOutput := map[string]interface{}{ + "matrix": []interface{}{ + []interface{}{"apple", "red"}, + []interface{}{"apple", "green"}, + []interface{}{"banana", "red"}, + []interface{}{"banana", "green"}, }, } runWorkflowTest(t, workflowPath, input, expectedOutput) diff --git a/impl/task_runner.go b/impl/task_runner.go index a64c329..a302bca 100644 --- a/impl/task_runner.go +++ b/impl/task_runner.go @@ -15,245 +15,41 @@ package impl import ( - "fmt" - "reflect" - "strings" - - "github.com/serverlessworkflow/sdk-go/v3/impl/expr" + "context" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" + "time" ) var _ TaskRunner = &SetTaskRunner{} var _ TaskRunner = &RaiseTaskRunner{} var _ TaskRunner = &ForTaskRunner{} +var _ TaskRunner = &DoTaskRunner{} type TaskRunner interface { Run(input interface{}) (interface{}, error) GetTaskName() string } -func NewSetTaskRunner(taskName string, task *model.SetTask, taskSupport TaskSupport) (*SetTaskRunner, error) { - if task == nil || task.Set == nil { - return nil, model.NewErrValidation(fmt.Errorf("no set configuration provided for SetTask %s", taskName), taskName) - } - return &SetTaskRunner{ - Task: task, - TaskName: taskName, - TaskSupport: taskSupport, - }, nil -} - -type SetTaskRunner struct { - Task *model.SetTask - TaskName string - TaskSupport TaskSupport -} - -func (s *SetTaskRunner) GetTaskName() string { - return s.TaskName -} - -func (s *SetTaskRunner) Run(input interface{}) (output interface{}, err error) { - setObject := deepClone(s.Task.Set) - result, err := traverseAndEvaluate(model.NewObjectOrRuntimeExpr(setObject), input, s.TaskName, s.TaskSupport.GetContext()) - if err != nil { - return nil, err - } - - output, ok := result.(map[string]interface{}) - if !ok { - return nil, model.NewErrRuntime(fmt.Errorf("expected output to be a map[string]interface{}, but got a different type. Got: %v", result), s.TaskName) - } - - return output, nil -} - -func NewRaiseTaskRunner(taskName string, task *model.RaiseTask, taskSupport TaskSupport) (*RaiseTaskRunner, error) { - if err := resolveErrorDefinition(task, taskSupport.GetWorkflowDef()); err != nil { - return nil, err - } - - if task.Raise.Error.Definition == nil { - return nil, model.NewErrValidation(fmt.Errorf("no raise configuration provided for RaiseTask %s", taskName), taskName) - } - return &RaiseTaskRunner{ - Task: task, - TaskName: taskName, - TaskSupport: taskSupport, - }, nil -} - -// TODO: can e refactored to a definition resolver callable from the context -func resolveErrorDefinition(t *model.RaiseTask, workflowDef *model.Workflow) error { - if workflowDef != nil && t.Raise.Error.Ref != nil { - notFoundErr := model.NewErrValidation(fmt.Errorf("%v error definition not found in 'uses'", t.Raise.Error.Ref), "") - if workflowDef.Use != nil && workflowDef.Use.Errors != nil { - definition, ok := workflowDef.Use.Errors[*t.Raise.Error.Ref] - if !ok { - return notFoundErr - } - t.Raise.Error.Definition = definition - return nil - } - return notFoundErr - } - return nil -} - -type RaiseTaskRunner struct { - Task *model.RaiseTask - TaskName string - TaskSupport TaskSupport -} - -var raiseErrFuncMapping = map[string]func(error, string) *model.Error{ - model.ErrorTypeAuthentication: model.NewErrAuthentication, - model.ErrorTypeValidation: model.NewErrValidation, - model.ErrorTypeCommunication: model.NewErrCommunication, - model.ErrorTypeAuthorization: model.NewErrAuthorization, - model.ErrorTypeConfiguration: model.NewErrConfiguration, - model.ErrorTypeExpression: model.NewErrExpression, - model.ErrorTypeRuntime: model.NewErrRuntime, - model.ErrorTypeTimeout: model.NewErrTimeout, -} - -func (r *RaiseTaskRunner) Run(input interface{}) (output interface{}, err error) { - output = input - // TODO: make this an external func so we can call it after getting the reference? Or we can get the reference from the workflow definition - var detailResult interface{} - detailResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName, r.TaskSupport.GetContext()) - if err != nil { - return nil, err - } - - var titleResult interface{} - titleResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName, r.TaskSupport.GetContext()) - if err != nil { - return nil, err - } - - instance := &model.JsonPointerOrRuntimeExpression{Value: r.TaskName} - - var raiseErr *model.Error - if raiseErrF, ok := raiseErrFuncMapping[r.Task.Raise.Error.Definition.Type.String()]; ok { - raiseErr = raiseErrF(fmt.Errorf("%v", detailResult), instance.String()) - } else { - raiseErr = r.Task.Raise.Error.Definition - raiseErr.Detail = model.NewStringOrRuntimeExpr(fmt.Sprintf("%v", detailResult)) - raiseErr.Instance = instance - } - - raiseErr.Title = model.NewStringOrRuntimeExpr(fmt.Sprintf("%v", titleResult)) - err = raiseErr - - return output, err -} - -func (r *RaiseTaskRunner) GetTaskName() string { - return r.TaskName -} - -func NewForTaskRunner(taskName string, task *model.ForTask, taskSupport TaskSupport) (*ForTaskRunner, error) { - if task == nil || task.Do == nil { - return nil, model.NewErrValidation(fmt.Errorf("invalid For task %s", taskName), taskName) - } - - doRunner, err := NewDoTaskRunner(task.Do, taskSupport) - if err != nil { - return nil, err - } - - return &ForTaskRunner{ - Task: task, - TaskName: taskName, - DoRunner: doRunner, - TaskSupport: taskSupport, - }, nil -} - -const ( - forTaskDefaultEach = "$item" - forTaskDefaultAt = "$index" -) - -type ForTaskRunner struct { - Task *model.ForTask - TaskName string - DoRunner *DoTaskRunner - TaskSupport TaskSupport -} - -func (f *ForTaskRunner) Run(input interface{}) (interface{}, error) { - f.sanitizeFor() - in, err := expr.TraverseAndEvaluate(f.Task.For.In, input, f.TaskSupport.GetContext()) - if err != nil { - return nil, err - } - - var forOutput interface{} - rv := reflect.ValueOf(in) - switch rv.Kind() { - case reflect.Slice, reflect.Array: - for i := 0; i < rv.Len(); i++ { - item := rv.Index(i).Interface() - - if forOutput, err = f.processForItem(i, item, forOutput); err != nil { - return nil, err - } - } - case reflect.Invalid: - return input, nil - default: - if forOutput, err = f.processForItem(0, in, forOutput); err != nil { - return nil, err - } - } - - return forOutput, nil -} - -func (f *ForTaskRunner) processForItem(idx int, item interface{}, forOutput interface{}) (interface{}, error) { - forInput := map[string]interface{}{ - f.Task.For.At: idx, - f.Task.For.Each: item, - } - if forOutput != nil { - if outputMap, ok := forOutput.(map[string]interface{}); ok { - for key, value := range outputMap { - forInput[key] = value - } - } else { - return nil, fmt.Errorf("task %s item %s at index %d returned a non-json object, impossible to merge context", f.TaskName, f.Task.For.Each, idx) - } - } - var err error - forOutput, err = f.DoRunner.Run(forInput) - if err != nil { - return nil, err - } - - return forOutput, nil -} - -func (f *ForTaskRunner) sanitizeFor() { - f.Task.For.Each = strings.TrimSpace(f.Task.For.Each) - f.Task.For.At = strings.TrimSpace(f.Task.For.At) - - if f.Task.For.Each == "" { - f.Task.For.Each = forTaskDefaultEach - } - if f.Task.For.At == "" { - f.Task.For.At = forTaskDefaultAt - } - - if !strings.HasPrefix(f.Task.For.Each, "$") { - f.Task.For.Each = "$" + f.Task.For.Each - } - if !strings.HasPrefix(f.Task.For.At, "$") { - f.Task.For.At = "$" + f.Task.For.At - } -} - -func (f *ForTaskRunner) GetTaskName() string { - return f.TaskName +type TaskSupport interface { + SetTaskStatus(task string, status ctx.StatusPhase) + GetWorkflowDef() *model.Workflow + // SetWorkflowInstanceCtx is the `$context` variable accessible in JQ expressions and set in `export.as` + SetWorkflowInstanceCtx(value interface{}) + // GetContext gets the sharable Workflow context. Accessible via ctx.GetWorkflowContext. + GetContext() context.Context + SetTaskRawInput(value interface{}) + SetTaskRawOutput(value interface{}) + SetTaskDef(task model.Task) error + SetTaskStartedAt(value time.Time) + SetTaskName(name string) + // SetTaskReferenceFromName based on the taskName and the model.Workflow definition, set the JSON Pointer reference to the context + SetTaskReferenceFromName(taskName string) error + GetTaskReference() string + // SetLocalExprVars overrides local variables in expression processing + SetLocalExprVars(vars map[string]interface{}) + // AddLocalExprVars adds to the local variables in expression processing. Won't override previous entries. + AddLocalExprVars(vars map[string]interface{}) + // RemoveLocalExprVars removes local variables added in AddLocalExprVars or SetLocalExprVars + RemoveLocalExprVars(keys ...string) } diff --git a/impl/task_runner_do.go b/impl/task_runner_do.go index 07c0e29..75249b1 100644 --- a/impl/task_runner_do.go +++ b/impl/task_runner_do.go @@ -17,15 +17,10 @@ package impl import ( "fmt" "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" - - "github.com/serverlessworkflow/sdk-go/v3/impl/expr" "github.com/serverlessworkflow/sdk-go/v3/model" + "time" ) -var _ TaskRunner = &DoTaskRunner{} - -// TODO: refactor to receive a resolver handler instead of the workflow runner - // NewTaskRunner creates a TaskRunner instance based on the task type. func NewTaskRunner(taskName string, task model.Task, taskSupport TaskSupport) (TaskRunner, error) { switch t := task.(type) { @@ -58,15 +53,15 @@ func (d *DoTaskRunner) Run(input interface{}) (output interface{}, err error) { if d.TaskList == nil { return input, nil } - return d.executeTasks(input, d.TaskList) + return d.runTasks(input, d.TaskList) } func (d *DoTaskRunner) GetTaskName() string { return "" } -// executeTasks runs all defined tasks sequentially. -func (d *DoTaskRunner) executeTasks(input interface{}, tasks *model.TaskList) (output interface{}, err error) { +// runTasks runs all defined tasks sequentially. +func (d *DoTaskRunner) runTasks(input interface{}, tasks *model.TaskList) (output interface{}, err error) { output = input if tasks == nil { return output, nil @@ -76,6 +71,13 @@ func (d *DoTaskRunner) executeTasks(input interface{}, tasks *model.TaskList) (o currentTask := (*tasks)[idx] for currentTask != nil { + if err = d.TaskSupport.SetTaskDef(currentTask); err != nil { + return nil, err + } + if err = d.TaskSupport.SetTaskReferenceFromName(currentTask.Key); err != nil { + return nil, err + } + if shouldRun, err := d.shouldRunTask(input, currentTask); err != nil { return output, err } else if !shouldRun { @@ -105,13 +107,11 @@ func (d *DoTaskRunner) executeTasks(input interface{}, tasks *model.TaskList) (o func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (bool, error) { if task.GetBase().If != nil { - output, err := expr.TraverseAndEvaluate(task.GetBase().If.String(), input, d.TaskSupport.GetContext()) + output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, d.TaskSupport.GetContext()) if err != nil { return false, model.NewErrExpression(err, task.Key) } - if result, ok := output.(bool); ok && !result { - return false, nil - } + return output, nil } return true, nil } @@ -120,6 +120,10 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) { taskName := runner.GetTaskName() + d.TaskSupport.SetTaskStartedAt(time.Now()) + d.TaskSupport.SetTaskRawInput(input) + d.TaskSupport.SetTaskName(taskName) + if task.Input != nil { if input, err = d.processTaskInput(task, input, taskName); err != nil { return nil, err @@ -131,6 +135,8 @@ func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model return nil, err } + d.TaskSupport.SetTaskRawOutput(output) + if output, err = d.processTaskOutput(task, output, taskName); err != nil { return nil, err } diff --git a/impl/task_runner_for.go b/impl/task_runner_for.go new file mode 100644 index 0000000..825e7f6 --- /dev/null +++ b/impl/task_runner_for.go @@ -0,0 +1,135 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "fmt" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" + "github.com/serverlessworkflow/sdk-go/v3/model" + "reflect" + "strings" +) + +const ( + forTaskDefaultEach = "$item" + forTaskDefaultAt = "$index" +) + +func NewForTaskRunner(taskName string, task *model.ForTask, taskSupport TaskSupport) (*ForTaskRunner, error) { + if task == nil || task.Do == nil { + return nil, model.NewErrValidation(fmt.Errorf("invalid For task %s", taskName), taskName) + } + + doRunner, err := NewDoTaskRunner(task.Do, taskSupport) + if err != nil { + return nil, err + } + + return &ForTaskRunner{ + Task: task, + TaskName: taskName, + DoRunner: doRunner, + TaskSupport: taskSupport, + }, nil +} + +type ForTaskRunner struct { + Task *model.ForTask + TaskName string + DoRunner *DoTaskRunner + TaskSupport TaskSupport +} + +func (f *ForTaskRunner) Run(input interface{}) (interface{}, error) { + defer func() { + // clear local variables + f.TaskSupport.RemoveLocalExprVars(f.Task.For.Each, f.Task.For.At) + }() + f.sanitizeFor() + in, err := expr.TraverseAndEvaluate(f.Task.For.In, input, f.TaskSupport.GetContext()) + if err != nil { + return nil, err + } + + forOutput := input + rv := reflect.ValueOf(in) + switch rv.Kind() { + case reflect.Slice, reflect.Array: + for i := 0; i < rv.Len(); i++ { + item := rv.Index(i).Interface() + + if forOutput, err = f.processForItem(i, item, forOutput); err != nil { + return nil, err + } + if f.Task.While != "" { + whileIsTrue, err := traverseAndEvaluateBool(f.Task.While, forOutput, f.TaskSupport.GetContext()) + if err != nil { + return nil, err + } + if !whileIsTrue { + break + } + } + } + case reflect.Invalid: + return input, nil + default: + if forOutput, err = f.processForItem(0, in, forOutput); err != nil { + return nil, err + } + } + + return forOutput, nil +} + +func (f *ForTaskRunner) processForItem(idx int, item interface{}, forOutput interface{}) (interface{}, error) { + forVars := map[string]interface{}{ + f.Task.For.At: idx, + f.Task.For.Each: item, + } + // Instead of Set, we Add since other tasks in this very same context might be adding variables to the context + f.TaskSupport.AddLocalExprVars(forVars) + // output from previous iterations are merged together + var err error + forOutput, err = f.DoRunner.Run(forOutput) + if err != nil { + return nil, err + } + + return forOutput, nil +} + +func (f *ForTaskRunner) sanitizeFor() { + f.Task.For.Each = strings.TrimSpace(f.Task.For.Each) + f.Task.For.At = strings.TrimSpace(f.Task.For.At) + + if f.Task.For.Each == "" { + f.Task.For.Each = forTaskDefaultEach + } + if f.Task.For.At == "" { + f.Task.For.At = forTaskDefaultAt + } + + if !strings.HasPrefix(f.Task.For.Each, "$") { + f.Task.For.Each = "$" + f.Task.For.Each + } + if !strings.HasPrefix(f.Task.For.At, "$") { + f.Task.For.At = "$" + f.Task.For.At + } +} + +func (f *ForTaskRunner) GetTaskName() string { + return f.TaskName +} diff --git a/impl/task_runner_raise.go b/impl/task_runner_raise.go new file mode 100644 index 0000000..46014a5 --- /dev/null +++ b/impl/task_runner_raise.go @@ -0,0 +1,105 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "fmt" + "github.com/serverlessworkflow/sdk-go/v3/model" +) + +func NewRaiseTaskRunner(taskName string, task *model.RaiseTask, taskSupport TaskSupport) (*RaiseTaskRunner, error) { + if err := resolveErrorDefinition(task, taskSupport.GetWorkflowDef()); err != nil { + return nil, err + } + + if task.Raise.Error.Definition == nil { + return nil, model.NewErrValidation(fmt.Errorf("no raise configuration provided for RaiseTask %s", taskName), taskName) + } + return &RaiseTaskRunner{ + Task: task, + TaskName: taskName, + TaskSupport: taskSupport, + }, nil +} + +// TODO: can e refactored to a definition resolver callable from the context +func resolveErrorDefinition(t *model.RaiseTask, workflowDef *model.Workflow) error { + if workflowDef != nil && t.Raise.Error.Ref != nil { + notFoundErr := model.NewErrValidation(fmt.Errorf("%v error definition not found in 'uses'", t.Raise.Error.Ref), "") + if workflowDef.Use != nil && workflowDef.Use.Errors != nil { + definition, ok := workflowDef.Use.Errors[*t.Raise.Error.Ref] + if !ok { + return notFoundErr + } + t.Raise.Error.Definition = definition + return nil + } + return notFoundErr + } + return nil +} + +type RaiseTaskRunner struct { + Task *model.RaiseTask + TaskName string + TaskSupport TaskSupport +} + +var raiseErrFuncMapping = map[string]func(error, string) *model.Error{ + model.ErrorTypeAuthentication: model.NewErrAuthentication, + model.ErrorTypeValidation: model.NewErrValidation, + model.ErrorTypeCommunication: model.NewErrCommunication, + model.ErrorTypeAuthorization: model.NewErrAuthorization, + model.ErrorTypeConfiguration: model.NewErrConfiguration, + model.ErrorTypeExpression: model.NewErrExpression, + model.ErrorTypeRuntime: model.NewErrRuntime, + model.ErrorTypeTimeout: model.NewErrTimeout, +} + +func (r *RaiseTaskRunner) Run(input interface{}) (output interface{}, err error) { + output = input + // TODO: make this an external func so we can call it after getting the reference? Or we can get the reference from the workflow definition + var detailResult interface{} + detailResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName, r.TaskSupport.GetContext()) + if err != nil { + return nil, err + } + + var titleResult interface{} + titleResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName, r.TaskSupport.GetContext()) + if err != nil { + return nil, err + } + + instance := r.TaskSupport.GetTaskReference() + + var raiseErr *model.Error + if raiseErrF, ok := raiseErrFuncMapping[r.Task.Raise.Error.Definition.Type.String()]; ok { + raiseErr = raiseErrF(fmt.Errorf("%v", detailResult), instance) + } else { + raiseErr = r.Task.Raise.Error.Definition + raiseErr.Detail = model.NewStringOrRuntimeExpr(fmt.Sprintf("%v", detailResult)) + raiseErr.Instance = &model.JsonPointerOrRuntimeExpression{Value: instance} + } + + raiseErr.Title = model.NewStringOrRuntimeExpr(fmt.Sprintf("%v", titleResult)) + err = raiseErr + + return output, err +} + +func (r *RaiseTaskRunner) GetTaskName() string { + return r.TaskName +} diff --git a/impl/task_runner_raise_test.go b/impl/task_runner_raise_test.go index 1f0af1a..e85ac28 100644 --- a/impl/task_runner_raise_test.go +++ b/impl/task_runner_raise_test.go @@ -17,6 +17,7 @@ package impl import ( "encoding/json" "errors" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "testing" "github.com/serverlessworkflow/sdk-go/v3/model" @@ -39,7 +40,11 @@ func TestRaiseTaskRunner_WithDefinedError(t *testing.T) { }, } - runner, err := NewRaiseTaskRunner("task_raise_defined", raiseTask, newTaskSupport()) + wfCtx, err := ctx.NewWorkflowContext(&model.Workflow{}) + assert.NoError(t, err) + wfCtx.SetTaskReference("task_raise_defined") + + runner, err := NewRaiseTaskRunner("task_raise_defined", raiseTask, newTaskSupport(withRunnerCtx(wfCtx))) assert.NoError(t, err) output, err := runner.Run(input) @@ -93,7 +98,11 @@ func TestRaiseTaskRunner_TimeoutErrorWithExpression(t *testing.T) { }, } - runner, err := NewRaiseTaskRunner("task_raise_timeout_expr", raiseTask, newTaskSupport()) + wfCtx, err := ctx.NewWorkflowContext(&model.Workflow{}) + assert.NoError(t, err) + wfCtx.SetTaskReference("task_raise_timeout_expr") + + runner, err := NewRaiseTaskRunner("task_raise_timeout_expr", raiseTask, newTaskSupport(withRunnerCtx(wfCtx))) assert.NoError(t, err) output, err := runner.Run(input) diff --git a/impl/task_runner_set.go b/impl/task_runner_set.go new file mode 100644 index 0000000..295a5f2 --- /dev/null +++ b/impl/task_runner_set.go @@ -0,0 +1,56 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "fmt" + "github.com/serverlessworkflow/sdk-go/v3/model" +) + +func NewSetTaskRunner(taskName string, task *model.SetTask, taskSupport TaskSupport) (*SetTaskRunner, error) { + if task == nil || task.Set == nil { + return nil, model.NewErrValidation(fmt.Errorf("no set configuration provided for SetTask %s", taskName), taskName) + } + return &SetTaskRunner{ + Task: task, + TaskName: taskName, + TaskSupport: taskSupport, + }, nil +} + +type SetTaskRunner struct { + Task *model.SetTask + TaskName string + TaskSupport TaskSupport +} + +func (s *SetTaskRunner) GetTaskName() string { + return s.TaskName +} + +func (s *SetTaskRunner) Run(input interface{}) (output interface{}, err error) { + setObject := deepClone(s.Task.Set) + result, err := traverseAndEvaluate(model.NewObjectOrRuntimeExpr(setObject), input, s.TaskName, s.TaskSupport.GetContext()) + if err != nil { + return nil, err + } + + output, ok := result.(map[string]interface{}) + if !ok { + return nil, model.NewErrRuntime(fmt.Errorf("expected output to be a map[string]interface{}, but got a different type. Got: %v", result), s.TaskName) + } + + return output, nil +} diff --git a/impl/testdata/for_nested_loops.yaml b/impl/testdata/for_nested_loops.yaml new file mode 100644 index 0000000..4fd0284 --- /dev/null +++ b/impl/testdata/for_nested_loops.yaml @@ -0,0 +1,21 @@ +document: + dsl: '1.0.0' + namespace: for-tests + name: nested-loops + version: '1.0.0' +do: + - outerLoop: + for: + in: ${ .fruits } + each: fruit + at: fruitIdx + do: + - innerLoop: + for: + in: ${ $input.colors } + each: color + at: colorIdx + do: + - combinePair: + set: + matrix: ${ .matrix + [[$fruit, $color]] } diff --git a/impl/testdata/for_sum_numbers.yaml b/impl/testdata/for_sum_numbers.yaml new file mode 100644 index 0000000..75065fa --- /dev/null +++ b/impl/testdata/for_sum_numbers.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.0' + namespace: for-tests + name: sum-numbers + version: '1.0.0' +do: + - sumLoop: + for: + in: ${ .numbers } + do: + - addNumber: + set: + total: ${ .total + $item } + - finalize: + set: + result: ${ .total } diff --git a/impl/testdata/raise_inline.yaml b/impl/testdata/raise_inline.yaml index c464877..940528a 100644 --- a/impl/testdata/raise_inline.yaml +++ b/impl/testdata/raise_inline.yaml @@ -24,4 +24,4 @@ do: type: https://serverlessworkflow.io/spec/1.0.0/errors/validation status: 400 title: Validation Error - detail: ${ "Invalid input provided to workflow '\( $workflow.definition.document.name )'" } + detail: ${ "Invalid input provided to workflow \($workflow.definition.document.name)" } diff --git a/impl/utils.go b/impl/utils.go index 796c64d..20b2360 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -62,3 +62,17 @@ func traverseAndEvaluate(runtimeExpr *model.ObjectOrRuntimeExpr, input interface } return output, nil } + +func traverseAndEvaluateBool(runtimeExpr string, input interface{}, wfCtx context.Context) (bool, error) { + if len(runtimeExpr) == 0 { + return false, nil + } + output, err := expr.TraverseAndEvaluate(runtimeExpr, input, wfCtx) + if err != nil { + return false, nil + } + if result, ok := output.(bool); ok { + return result, nil + } + return false, nil +} diff --git a/model/errors.go b/model/errors.go index eeef71c..9700f17 100644 --- a/model/errors.go +++ b/model/errors.go @@ -18,7 +18,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strings" ) @@ -77,10 +76,10 @@ func (e *Error) WithInstanceRef(workflow *Workflow, taskName string) *Error { } // Generate a JSON pointer reference for the task within the workflow - instance, pointerErr := GenerateJSONPointer(workflow, taskName) - if pointerErr == nil { - e.Instance = &JsonPointerOrRuntimeExpression{Value: instance} - } + //instance, pointerErr := GenerateJSONPointer(workflow, taskName) + //if pointerErr == nil { + // e.Instance = &JsonPointerOrRuntimeExpression{Value: instance} + //} // TODO: log the pointer error return e @@ -268,57 +267,3 @@ func ErrorFromJSON(jsonStr string) (*Error, error) { } // JsonPointer functions - -func findJsonPointer(data interface{}, target string, path string) (string, bool) { - switch node := data.(type) { - case map[string]interface{}: - for key, value := range node { - newPath := fmt.Sprintf("%s/%s", path, key) - if key == target { - return newPath, true - } - if result, found := findJsonPointer(value, target, newPath); found { - return result, true - } - } - case []interface{}: - for i, item := range node { - newPath := fmt.Sprintf("%s/%d", path, i) - if result, found := findJsonPointer(item, target, newPath); found { - return result, true - } - } - } - return "", false -} - -// GenerateJSONPointer Function to generate JSON Pointer from a Workflow reference -func GenerateJSONPointer(workflow *Workflow, targetNode interface{}) (string, error) { - // Convert struct to JSON - jsonData, err := json.Marshal(workflow) - if err != nil { - return "", fmt.Errorf("error marshalling to JSON: %w", err) - } - - // Convert JSON to a generic map for traversal - var jsonMap map[string]interface{} - if err := json.Unmarshal(jsonData, &jsonMap); err != nil { - return "", fmt.Errorf("error unmarshalling JSON: %w", err) - } - - transformedNode := "" - switch node := targetNode.(type) { - case string: - transformedNode = node - default: - transformedNode = strings.ToLower(reflect.TypeOf(targetNode).Name()) - } - - // Search for the target node - jsonPointer, found := findJsonPointer(jsonMap, transformedNode, "") - if !found { - return "", fmt.Errorf("node '%s' not found", targetNode) - } - - return jsonPointer, nil -} From 6bc39eb6a016bcc2ae94bc4b2e3140f8f1ffeeff Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Thu, 27 Mar 2025 16:28:41 -0400 Subject: [PATCH 3/4] Add missing headers Signed-off-by: Ricardo Zanini --- impl/testdata/for_nested_loops.yaml | 14 ++++++++++++++ impl/testdata/for_sum_numbers.yaml | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/impl/testdata/for_nested_loops.yaml b/impl/testdata/for_nested_loops.yaml index 4fd0284..3bef556 100644 --- a/impl/testdata/for_nested_loops.yaml +++ b/impl/testdata/for_nested_loops.yaml @@ -1,3 +1,17 @@ +# Copyright 2025 The Serverless Workflow Specification Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + document: dsl: '1.0.0' namespace: for-tests diff --git a/impl/testdata/for_sum_numbers.yaml b/impl/testdata/for_sum_numbers.yaml index 75065fa..afc81e9 100644 --- a/impl/testdata/for_sum_numbers.yaml +++ b/impl/testdata/for_sum_numbers.yaml @@ -1,3 +1,17 @@ +# Copyright 2025 The Serverless Workflow Specification Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + document: dsl: '1.0.0' namespace: for-tests From 3782d3afec84af7e52360a5c54634e64bc79af8f Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Thu, 27 Mar 2025 16:32:18 -0400 Subject: [PATCH 4/4] Add nolint:unused Signed-off-by: Ricardo Zanini --- impl/runner_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/impl/runner_test.go b/impl/runner_test.go index 981be36..32c9c86 100644 --- a/impl/runner_test.go +++ b/impl/runner_test.go @@ -49,12 +49,14 @@ func newTaskSupport(opts ...taskSupportOpts) TaskSupport { return ts } +//nolint:unused func withWorkflow(wf *model.Workflow) taskSupportOpts { return func(ts *workflowRunnerImpl) { ts.Workflow = wf } } +//nolint:unused func withContext(ctx context.Context) taskSupportOpts { return func(ts *workflowRunnerImpl) { ts.Context = ctx