From 3bb761556e70ce3ba8447fbbe8376e60a24fe600 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini <ricardozanini@gmail.com> Date: Fri, 28 Mar 2025 15:41:39 -0400 Subject: [PATCH 1/2] Fix #233 - Add support to 'switch' task Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> --- README.md | 4 +-- impl/runner_test.go | 48 ++++++++++++++++++++++++++ impl/task_runner_do.go | 44 +++++++++++++++++++++++ impl/testdata/switch_match.yaml | 29 ++++++++++++++++ impl/testdata/switch_with_default.yaml | 29 ++++++++++++++++ model/runtime_expression.go | 8 +++++ 6 files changed, 160 insertions(+), 2 deletions(-) create mode 100644 impl/testdata/switch_match.yaml create mode 100644 impl/testdata/switch_with_default.yaml diff --git a/README.md b/README.md index f05e54c..1a6654e 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ The table below lists the current state of this implementation. This table is a | Task Raise | ✅ | | Task Run | ❌ | | Task Set | ✅ | -| Task Switch | ❌ | +| Task Switch | ✅ | | Task Try | ❌ | | Task Wait | ❌ | | Lifecycle Events | 🟡 | @@ -157,7 +157,7 @@ The table below lists the current state of this implementation. This table is a | AsyncAPI Server | ❌ | | AsyncAPI Outbound Message | ❌ | | AsyncAPI Subscription | ❌ | -| Workflow Definition Reference | ❌ | +| Workflow Definition Reference | ✅ | | Subscription Iterator | ❌ | We love contributions! Our aim is to have a complete implementation to serve as a reference or to become a project on its own to favor the CNCF Ecosystem. diff --git a/impl/runner_test.go b/impl/runner_test.go index 32c9c86..c392dc0 100644 --- a/impl/runner_test.go +++ b/impl/runner_test.go @@ -407,3 +407,51 @@ func TestForTaskRunner_Run(t *testing.T) { }) } + +func TestSwitchTaskRunner_Run(t *testing.T) { + t.Run("Color is red", func(t *testing.T) { + workflowPath := "./testdata/switch_match.yaml" + input := map[string]interface{}{ + "color": "red", + } + expectedOutput := map[string]interface{}{ + "colors": []interface{}{"red"}, + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) + + t.Run("Color is green", func(t *testing.T) { + workflowPath := "./testdata/switch_match.yaml" + input := map[string]interface{}{ + "color": "green", + } + expectedOutput := map[string]interface{}{ + "colors": []interface{}{"green"}, + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) + + t.Run("Color is blue", func(t *testing.T) { + workflowPath := "./testdata/switch_match.yaml" + input := map[string]interface{}{ + "color": "blue", + } + expectedOutput := map[string]interface{}{ + "colors": []interface{}{"blue"}, + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) +} + +func TestSwitchTaskRunner_DefaultCase(t *testing.T) { + t.Run("Color is unknown, should match default", func(t *testing.T) { + workflowPath := "./testdata/switch_with_default.yaml" + input := map[string]interface{}{ + "color": "yellow", + } + expectedOutput := map[string]interface{}{ + "colors": []interface{}{"default"}, + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) +} diff --git a/impl/task_runner_do.go b/impl/task_runner_do.go index 75249b1..16d95fb 100644 --- a/impl/task_runner_do.go +++ b/impl/task_runner_do.go @@ -86,6 +86,24 @@ func (d *DoTaskRunner) runTasks(input interface{}, tasks *model.TaskList) (outpu } d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.PendingStatus) + + // Check if this task is a SwitchTask and handle it + if switchTask, ok := currentTask.Task.(*model.SwitchTask); ok { + flowDirective, err := d.evaluateSwitchTask(input, currentTask.Key, switchTask) + if err != nil { + d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.FaultedStatus) + return output, err + } + d.TaskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus) + + // Process FlowDirective: update idx/currentTask accordingly + idx, currentTask = tasks.KeyAndIndex(flowDirective.Value) + if currentTask == nil { + return nil, fmt.Errorf("flow directive target '%s' not found", flowDirective.Value) + } + continue + } + runner, err := NewTaskRunner(currentTask.Key, currentTask.Task, d.TaskSupport) if err != nil { return output, err @@ -116,6 +134,32 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b return true, nil } +func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, switchTask *model.SwitchTask) (*model.FlowDirective, error) { + var defaultThen *model.FlowDirective + for _, switchItem := range switchTask.Switch { + for _, switchCase := range switchItem { + if switchCase.When == nil { + defaultThen = switchCase.Then + continue + } + result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, d.TaskSupport.GetContext()) + if err != nil { + return nil, model.NewErrExpression(err, taskKey) + } + if result { + if switchCase.Then == nil { + return nil, model.NewErrExpression(fmt.Errorf("missing 'then' directive in matched switch case"), taskKey) + } + return switchCase.Then, nil + } + } + } + if defaultThen != nil { + return defaultThen, nil + } + return nil, model.NewErrExpression(fmt.Errorf("no matching switch case"), taskKey) +} + // runTask executes an individual task. func (d *DoTaskRunner) runTask(input interface{}, runner TaskRunner, task *model.TaskBase) (output interface{}, err error) { taskName := runner.GetTaskName() diff --git a/impl/testdata/switch_match.yaml b/impl/testdata/switch_match.yaml new file mode 100644 index 0000000..f92af55 --- /dev/null +++ b/impl/testdata/switch_match.yaml @@ -0,0 +1,29 @@ +document: + dsl: '1.0.0' + namespace: default + name: switch-match + version: '1.0.0' +do: + - switchColor: + switch: + - red: + when: '.color == "red"' + then: setRed + - green: + when: '.color == "green"' + then: setGreen + - blue: + when: '.color == "blue"' + then: setBlue + - setRed: + set: + colors: '${ .colors + [ "red" ] }' + then: end + - setGreen: + set: + colors: '${ .colors + [ "green" ] }' + then: end + - setBlue: + set: + colors: '${ .colors + [ "blue" ] }' + then: end diff --git a/impl/testdata/switch_with_default.yaml b/impl/testdata/switch_with_default.yaml new file mode 100644 index 0000000..3efb474 --- /dev/null +++ b/impl/testdata/switch_with_default.yaml @@ -0,0 +1,29 @@ +document: + dsl: '1.0.0' + namespace: default + name: switch-with-default + version: '1.0.0' + +do: + - switchColor: + switch: + - red: + when: '.color == "red"' + then: setRed + - green: + when: '.color == "green"' + then: setGreen + - fallback: + then: setDefault + - setRed: + set: + colors: '${ .colors + [ "red" ] }' + then: end + - setGreen: + set: + colors: '${ .colors + [ "green" ] }' + then: end + - setDefault: + set: + colors: '${ .colors + [ "default" ] }' + then: end diff --git a/model/runtime_expression.go b/model/runtime_expression.go index ae04e46..5d9eab5 100644 --- a/model/runtime_expression.go +++ b/model/runtime_expression.go @@ -59,6 +59,14 @@ func IsValidExpr(expression string) bool { return err == nil } +// NormalizeExpr adds ${} to the given string +func NormalizeExpr(expr string) string { + if strings.HasPrefix(expr, "${") { + return expr + } + return fmt.Sprintf("${%s}", expr) +} + // IsValid checks if the RuntimeExpression value is valid, handling both with and without `${}`. func (r *RuntimeExpression) IsValid() bool { return IsValidExpr(r.Value) From dccd6c5fca929366aa84f915b23efae815e96589 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini <ricardozanini@gmail.com> Date: Fri, 28 Mar 2025 15:46:35 -0400 Subject: [PATCH 2/2] Fix headers and linters Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> --- impl/ctx/context.go | 5 +++-- impl/expr/expr.go | 1 + impl/json_pointer.go | 3 ++- impl/json_pointer_test.go | 11 ++++++----- impl/runner.go | 3 ++- impl/runner_test.go | 7 ++++--- impl/task_runner.go | 3 ++- impl/task_runner_do.go | 3 ++- impl/task_runner_for.go | 5 +++-- impl/task_runner_raise.go | 1 + impl/task_runner_raise_test.go | 3 ++- impl/task_runner_set.go | 1 + impl/testdata/switch_match.yaml | 14 ++++++++++++++ impl/testdata/switch_with_default.yaml | 14 ++++++++++++++ impl/utils.go | 1 + model/runtime_expression.go | 3 ++- 16 files changed, 60 insertions(+), 18 deletions(-) diff --git a/impl/ctx/context.go b/impl/ctx/context.go index 1f0d716..f013507 100644 --- a/impl/ctx/context.go +++ b/impl/ctx/context.go @@ -19,10 +19,11 @@ import ( "encoding/json" "errors" "fmt" - "github.com/google/uuid" - "github.com/serverlessworkflow/sdk-go/v3/model" "sync" "time" + + "github.com/google/uuid" + "github.com/serverlessworkflow/sdk-go/v3/model" ) var ErrWorkflowContextNotFound = errors.New("workflow context not found") diff --git a/impl/expr/expr.go b/impl/expr/expr.go index 03d558e..60e2765 100644 --- a/impl/expr/expr.go +++ b/impl/expr/expr.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/itchyny/gojq" "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" diff --git a/impl/json_pointer.go b/impl/json_pointer.go index 4d276ff..dedaaf3 100644 --- a/impl/json_pointer.go +++ b/impl/json_pointer.go @@ -17,9 +17,10 @@ package impl import ( "encoding/json" "fmt" - "github.com/serverlessworkflow/sdk-go/v3/model" "reflect" "strings" + + "github.com/serverlessworkflow/sdk-go/v3/model" ) func findJsonPointer(data interface{}, target string, path string) (string, bool) { diff --git a/impl/json_pointer_test.go b/impl/json_pointer_test.go index 76077bc..aeec1e4 100644 --- a/impl/json_pointer_test.go +++ b/impl/json_pointer_test.go @@ -15,9 +15,10 @@ package impl import ( + "testing" + "github.com/serverlessworkflow/sdk-go/v3/model" "github.com/stretchr/testify/assert" - "testing" ) // TestGenerateJSONPointer_SimpleTask tests a simple workflow task. @@ -60,8 +61,8 @@ func TestGenerateJSONPointer_ForkTask(t *testing.T) { Fork: model.ForkTaskConfiguration{ Compete: true, Branches: &model.TaskList{ - {Key: "callNurse", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/nurses")}}}, - {Key: "callDoctor", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/doctor")}}}, + &model.TaskItem{Key: "callNurse", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/nurses")}}}, + &model.TaskItem{Key: "callDoctor", Task: &model.CallHTTP{Call: "http", With: model.HTTPArguments{Method: "put", Endpoint: model.NewEndpoint("https://hospital.com/api/alert/doctor")}}}, }, }, }, @@ -85,12 +86,12 @@ func TestGenerateJSONPointer_DeepNestedTask(t *testing.T) { Fork: model.ForkTaskConfiguration{ Compete: false, Branches: &model.TaskList{ - { + &model.TaskItem{ Key: "branchA", Task: &model.ForkTask{ Fork: model.ForkTaskConfiguration{ Branches: &model.TaskList{ - { + &model.TaskItem{ Key: "deepTask", Task: &model.SetTask{Set: map[string]interface{}{"result": "done"}}, }, diff --git a/impl/runner.go b/impl/runner.go index 1c9ad8b..5328ee3 100644 --- a/impl/runner.go +++ b/impl/runner.go @@ -17,9 +17,10 @@ package impl import ( "context" "fmt" + "time" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" - "time" ) var _ WorkflowRunner = &workflowRunnerImpl{} diff --git a/impl/runner_test.go b/impl/runner_test.go index c392dc0..9bb599c 100644 --- a/impl/runner_test.go +++ b/impl/runner_test.go @@ -17,13 +17,14 @@ package impl import ( "context" "fmt" + "os" + "path/filepath" + "testing" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" "github.com/serverlessworkflow/sdk-go/v3/parser" "github.com/stretchr/testify/assert" - "os" - "path/filepath" - "testing" ) type taskSupportOpts func(*workflowRunnerImpl) diff --git a/impl/task_runner.go b/impl/task_runner.go index a302bca..6d9069d 100644 --- a/impl/task_runner.go +++ b/impl/task_runner.go @@ -16,9 +16,10 @@ package impl import ( "context" + "time" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" - "time" ) var _ TaskRunner = &SetTaskRunner{} diff --git a/impl/task_runner_do.go b/impl/task_runner_do.go index 16d95fb..81ef374 100644 --- a/impl/task_runner_do.go +++ b/impl/task_runner_do.go @@ -16,9 +16,10 @@ package impl import ( "fmt" + "time" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" - "time" ) // NewTaskRunner creates a TaskRunner instance based on the task type. diff --git a/impl/task_runner_for.go b/impl/task_runner_for.go index 825e7f6..fb7bcff 100644 --- a/impl/task_runner_for.go +++ b/impl/task_runner_for.go @@ -16,10 +16,11 @@ package impl import ( "fmt" - "github.com/serverlessworkflow/sdk-go/v3/impl/expr" - "github.com/serverlessworkflow/sdk-go/v3/model" "reflect" "strings" + + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" + "github.com/serverlessworkflow/sdk-go/v3/model" ) const ( diff --git a/impl/task_runner_raise.go b/impl/task_runner_raise.go index 46014a5..b59f01d 100644 --- a/impl/task_runner_raise.go +++ b/impl/task_runner_raise.go @@ -16,6 +16,7 @@ package impl import ( "fmt" + "github.com/serverlessworkflow/sdk-go/v3/model" ) diff --git a/impl/task_runner_raise_test.go b/impl/task_runner_raise_test.go index e85ac28..0c55f3a 100644 --- a/impl/task_runner_raise_test.go +++ b/impl/task_runner_raise_test.go @@ -17,9 +17,10 @@ package impl import ( "encoding/json" "errors" - "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "testing" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" + "github.com/serverlessworkflow/sdk-go/v3/model" "github.com/stretchr/testify/assert" ) diff --git a/impl/task_runner_set.go b/impl/task_runner_set.go index 295a5f2..fc40e74 100644 --- a/impl/task_runner_set.go +++ b/impl/task_runner_set.go @@ -16,6 +16,7 @@ package impl import ( "fmt" + "github.com/serverlessworkflow/sdk-go/v3/model" ) diff --git a/impl/testdata/switch_match.yaml b/impl/testdata/switch_match.yaml index f92af55..4f913af 100644 --- a/impl/testdata/switch_match.yaml +++ b/impl/testdata/switch_match.yaml @@ -1,3 +1,17 @@ +# Copyright 2025 The Serverless Workflow Specification Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + document: dsl: '1.0.0' namespace: default diff --git a/impl/testdata/switch_with_default.yaml b/impl/testdata/switch_with_default.yaml index 3efb474..8a4f1b9 100644 --- a/impl/testdata/switch_with_default.yaml +++ b/impl/testdata/switch_with_default.yaml @@ -1,3 +1,17 @@ +# Copyright 2025 The Serverless Workflow Specification Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + document: dsl: '1.0.0' namespace: default diff --git a/impl/utils.go b/impl/utils.go index 20b2360..a62559d 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -16,6 +16,7 @@ package impl import ( "context" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" "github.com/serverlessworkflow/sdk-go/v3/model" ) diff --git a/model/runtime_expression.go b/model/runtime_expression.go index 5d9eab5..adef566 100644 --- a/model/runtime_expression.go +++ b/model/runtime_expression.go @@ -17,8 +17,9 @@ package model import ( "encoding/json" "fmt" - "github.com/itchyny/gojq" "strings" + + "github.com/itchyny/gojq" ) // RuntimeExpression represents a runtime expression.