diff --git a/.github/workflows/pull_request_labeler.yml b/.github/workflows/pull_request_labeler.yml deleted file mode 100644 index f270294..0000000 --- a/.github/workflows/pull_request_labeler.yml +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2022 The Serverless Workflow Specification Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: "Pull Request Labeler" -on: - - pull_request_target - -jobs: - labeler: - permissions: - contents: read - pull-requests: write - runs-on: ubuntu-latest - steps: - - uses: actions/labeler@v5 \ No newline at end of file diff --git a/Makefile b/Makefile index 767d158..34bfc91 100644 --- a/Makefile +++ b/Makefile @@ -11,12 +11,14 @@ goimports: @goimports -w . lint: - @echo "πŸš€ Running lint..." - @command -v golangci-lint > /dev/null || (echo "πŸš€ Installing golangci-lint..."; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b "${GOPATH}/bin") + @echo "πŸš€ Installing/updating golangci-lint…" + GO111MODULE=on go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest + + @echo "πŸš€ Running lint…" @make addheaders @make goimports @make fmt - @./hack/go-lint.sh ${params} + @$(GOPATH)/bin/golangci-lint run ./... ${params} @echo "βœ… Linting completed!" .PHONY: test diff --git a/impl/ctx/context.go b/impl/ctx/context.go index f013507..ff1d260 100644 --- a/impl/ctx/context.go +++ b/impl/ctx/context.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/serverlessworkflow/sdk-go/v3/impl/utils" + "github.com/google/uuid" "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -71,6 +73,7 @@ type WorkflowContext interface { SetLocalExprVars(vars map[string]interface{}) AddLocalExprVars(vars map[string]interface{}) RemoveLocalExprVars(keys ...string) + Clone() WorkflowContext } // workflowContext holds the necessary data for the workflow execution within the instance. @@ -118,6 +121,38 @@ func GetWorkflowContext(ctx context.Context) (WorkflowContext, error) { return wfCtx, nil } +func (ctx *workflowContext) Clone() WorkflowContext { + ctx.mu.Lock() + defer ctx.mu.Unlock() + + newInput := utils.DeepCloneValue(ctx.input) + newOutput := utils.DeepCloneValue(ctx.output) + + // deep clone each of the maps + newContextMap := utils.DeepClone(ctx.context) + newWorkflowDesc := utils.DeepClone(ctx.workflowDescriptor) + newTaskDesc := utils.DeepClone(ctx.taskDescriptor) + newLocalExprVars := utils.DeepClone(ctx.localExprVars) + + newStatusPhase := append([]StatusPhaseLog(nil), ctx.StatusPhase...) + + newTasksStatusPhase := make(map[string][]StatusPhaseLog, len(ctx.TasksStatusPhase)) + for taskName, logs := range ctx.TasksStatusPhase { + newTasksStatusPhase[taskName] = append([]StatusPhaseLog(nil), logs...) + } + + return &workflowContext{ + input: newInput, + output: newOutput, + context: newContextMap, + workflowDescriptor: newWorkflowDesc, + taskDescriptor: newTaskDesc, + localExprVars: newLocalExprVars, + StatusPhase: newStatusPhase, + TasksStatusPhase: newTasksStatusPhase, + } +} + func (ctx *workflowContext) SetStartedAt(t time.Time) { ctx.mu.Lock() defer ctx.mu.Unlock() diff --git a/impl/expr/expr.go b/impl/expr/expr.go index 60e2765..77faffb 100644 --- a/impl/expr/expr.go +++ b/impl/expr/expr.go @@ -132,3 +132,28 @@ func mergeContextInVars(nodeCtx context.Context, variables map[string]interface{ return nil } + +func TraverseAndEvaluateObj(runtimeExpr *model.ObjectOrRuntimeExpr, input interface{}, taskName string, wfCtx context.Context) (output interface{}, err error) { + if runtimeExpr == nil { + return input, nil + } + output, err = TraverseAndEvaluate(runtimeExpr.AsStringOrMap(), input, wfCtx) + if err != nil { + return nil, model.NewErrExpression(err, taskName) + } + return output, nil +} + +func TraverseAndEvaluateBool(runtimeExpr string, input interface{}, wfCtx context.Context) (bool, error) { + if len(runtimeExpr) == 0 { + return false, nil + } + output, err := TraverseAndEvaluate(runtimeExpr, input, wfCtx) + if err != nil { + return false, nil + } + if result, ok := output.(bool); ok { + return result, nil + } + return false, nil +} diff --git a/impl/runner.go b/impl/runner.go index 362db1b..33d852a 100644 --- a/impl/runner.go +++ b/impl/runner.go @@ -19,6 +19,9 @@ import ( "fmt" "time" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" + "github.com/serverlessworkflow/sdk-go/v3/impl/utils" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -53,6 +56,18 @@ type workflowRunnerImpl struct { RunnerCtx ctx.WorkflowContext } +func (wr *workflowRunnerImpl) CloneWithContext(newCtx context.Context) TaskSupport { + clonedWfCtx := wr.RunnerCtx.Clone() + + ctxWithWf := ctx.WithWorkflowContext(newCtx, clonedWfCtx) + + return &workflowRunnerImpl{ + Workflow: wr.Workflow, + Context: ctxWithWf, + RunnerCtx: clonedWfCtx, + } +} + func (wr *workflowRunnerImpl) RemoveLocalExprVars(keys ...string) { wr.RunnerCtx.RemoveLocalExprVars(keys...) } @@ -175,13 +190,13 @@ func (wr *workflowRunnerImpl) wrapWorkflowError(err error) error { func (wr *workflowRunnerImpl) processInput(input interface{}) (output interface{}, err error) { if wr.Workflow.Input != nil { if wr.Workflow.Input.Schema != nil { - if err = validateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil { + if err = utils.ValidateSchema(input, wr.Workflow.Input.Schema, "/"); err != nil { return nil, err } } if wr.Workflow.Input.From != nil { - output, err = traverseAndEvaluate(wr.Workflow.Input.From, input, "/", wr.Context) + output, err = expr.TraverseAndEvaluateObj(wr.Workflow.Input.From, input, "/", wr.Context) if err != nil { return nil, err } @@ -196,13 +211,13 @@ func (wr *workflowRunnerImpl) processOutput(output interface{}) (interface{}, er if wr.Workflow.Output != nil { if wr.Workflow.Output.As != nil { var err error - output, err = traverseAndEvaluate(wr.Workflow.Output.As, output, "/", wr.Context) + output, err = expr.TraverseAndEvaluateObj(wr.Workflow.Output.As, output, "/", wr.Context) if err != nil { return nil, err } } if wr.Workflow.Output.Schema != nil { - if err := validateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil { + if err := utils.ValidateSchema(output, wr.Workflow.Output.Schema, "/"); err != nil { return nil, err } } diff --git a/impl/runner_test.go b/impl/runner_test.go index 9bb599c..5acdb6b 100644 --- a/impl/runner_test.go +++ b/impl/runner_test.go @@ -456,3 +456,14 @@ func TestSwitchTaskRunner_DefaultCase(t *testing.T) { runWorkflowTest(t, workflowPath, input, expectedOutput) }) } + +func TestForkSimple_NoCompete(t *testing.T) { + t.Run("Create a color array", func(t *testing.T) { + workflowPath := "./testdata/fork_simple.yaml" + input := map[string]interface{}{} + expectedOutput := map[string]interface{}{ + "colors": []interface{}{"red", "blue"}, + } + runWorkflowTest(t, workflowPath, input, expectedOutput) + }) +} diff --git a/impl/task_runner.go b/impl/task_runner.go index ea7b6dd..f825f79 100644 --- a/impl/task_runner.go +++ b/impl/task_runner.go @@ -53,4 +53,7 @@ type TaskSupport interface { AddLocalExprVars(vars map[string]interface{}) // RemoveLocalExprVars removes local variables added in AddLocalExprVars or SetLocalExprVars RemoveLocalExprVars(keys ...string) + // CloneWithContext returns a full clone of this TaskSupport, but using + // the provided context.Context (so deadlines/cancellations propagate). + CloneWithContext(ctx context.Context) TaskSupport } diff --git a/impl/task_runner_do.go b/impl/task_runner_do.go index 0301009..8b63bfc 100644 --- a/impl/task_runner_do.go +++ b/impl/task_runner_do.go @@ -18,6 +18,9 @@ import ( "fmt" "time" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" + "github.com/serverlessworkflow/sdk-go/v3/impl/utils" + "github.com/serverlessworkflow/sdk-go/v3/impl/ctx" "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -35,6 +38,8 @@ func NewTaskRunner(taskName string, task model.Task, workflowDef *model.Workflow return NewForTaskRunner(taskName, t) case *model.CallHTTP: return NewCallHttpRunner(taskName, t) + case *model.ForkTask: + return NewForkTaskRunner(taskName, t, workflowDef) default: return nil, fmt.Errorf("unsupported task type '%T' for task '%s'", t, taskName) } @@ -117,7 +122,7 @@ func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (out } taskSupport.SetTaskStatus(currentTask.Key, ctx.CompletedStatus) - input = deepCloneValue(output) + input = utils.DeepCloneValue(output) idx, currentTask = d.TaskList.Next(idx) } @@ -126,7 +131,7 @@ func (d *DoTaskRunner) runTasks(input interface{}, taskSupport TaskSupport) (out func (d *DoTaskRunner) shouldRunTask(input interface{}, taskSupport TaskSupport, task *model.TaskItem) (bool, error) { if task.GetBase().If != nil { - output, err := traverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext()) + output, err := expr.TraverseAndEvaluateBool(task.GetBase().If.String(), input, taskSupport.GetContext()) if err != nil { return false, model.NewErrExpression(err, task.Key) } @@ -143,7 +148,7 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskSupport TaskSup defaultThen = switchCase.Then continue } - result, err := traverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext()) + result, err := expr.TraverseAndEvaluateBool(model.NormalizeExpr(switchCase.When.String()), input, taskSupport.GetContext()) if err != nil { return nil, model.NewErrExpression(err, taskKey) } @@ -199,11 +204,11 @@ func (d *DoTaskRunner) processTaskInput(task *model.TaskBase, taskInput interfac return taskInput, nil } - if err = validateSchema(taskInput, task.Input.Schema, taskName); err != nil { + if err = utils.ValidateSchema(taskInput, task.Input.Schema, taskName); err != nil { return nil, err } - if output, err = traverseAndEvaluate(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil { + if output, err = expr.TraverseAndEvaluateObj(task.Input.From, taskInput, taskName, taskSupport.GetContext()); err != nil { return nil, err } @@ -216,11 +221,11 @@ func (d *DoTaskRunner) processTaskOutput(task *model.TaskBase, taskOutput interf return taskOutput, nil } - if output, err = traverseAndEvaluate(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil { + if output, err = expr.TraverseAndEvaluateObj(task.Output.As, taskOutput, taskName, taskSupport.GetContext()); err != nil { return nil, err } - if err = validateSchema(output, task.Output.Schema, taskName); err != nil { + if err = utils.ValidateSchema(output, task.Output.Schema, taskName); err != nil { return nil, err } @@ -232,12 +237,12 @@ func (d *DoTaskRunner) processTaskExport(task *model.TaskBase, taskOutput interf return nil } - output, err := traverseAndEvaluate(task.Export.As, taskOutput, taskName, taskSupport.GetContext()) + output, err := expr.TraverseAndEvaluateObj(task.Export.As, taskOutput, taskName, taskSupport.GetContext()) if err != nil { return err } - if err = validateSchema(output, task.Export.Schema, taskName); err != nil { + if err = utils.ValidateSchema(output, task.Export.Schema, taskName); err != nil { return nil } diff --git a/impl/task_runner_for.go b/impl/task_runner_for.go index a53348d..90461f9 100644 --- a/impl/task_runner_for.go +++ b/impl/task_runner_for.go @@ -73,7 +73,7 @@ func (f *ForTaskRunner) Run(input interface{}, taskSupport TaskSupport) (interfa return nil, err } if f.Task.While != "" { - whileIsTrue, err := traverseAndEvaluateBool(f.Task.While, forOutput, taskSupport.GetContext()) + whileIsTrue, err := expr.TraverseAndEvaluateBool(f.Task.While, forOutput, taskSupport.GetContext()) if err != nil { return nil, err } diff --git a/impl/task_runner_fork.go b/impl/task_runner_fork.go new file mode 100644 index 0000000..9a68399 --- /dev/null +++ b/impl/task_runner_fork.go @@ -0,0 +1,120 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "context" + "fmt" + "sync" + + "github.com/serverlessworkflow/sdk-go/v3/model" +) + +func NewForkTaskRunner(taskName string, task *model.ForkTask, workflowDef *model.Workflow) (*ForkTaskRunner, error) { + if task == nil || task.Fork.Branches == nil { + return nil, model.NewErrValidation(fmt.Errorf("invalid Fork task %s", taskName), taskName) + } + + var runners []TaskRunner + for _, branchItem := range *task.Fork.Branches { + r, err := NewTaskRunner(branchItem.Key, branchItem.Task, workflowDef) + if err != nil { + return nil, err + } + runners = append(runners, r) + } + + return &ForkTaskRunner{ + Task: task, + TaskName: taskName, + BranchRunners: runners, + }, nil +} + +type ForkTaskRunner struct { + Task *model.ForkTask + TaskName string + BranchRunners []TaskRunner +} + +func (f ForkTaskRunner) GetTaskName() string { + return f.TaskName +} + +func (f ForkTaskRunner) Run(input interface{}, parentSupport TaskSupport) (interface{}, error) { + cancelCtx, cancel := context.WithCancel(parentSupport.GetContext()) + defer cancel() + + n := len(f.BranchRunners) + results := make([]interface{}, n) + errs := make(chan error, n) + done := make(chan struct{}) + resultCh := make(chan interface{}, 1) + + var ( + wg sync.WaitGroup + once sync.Once // <-- declare a Once + ) + + for i, runner := range f.BranchRunners { + wg.Add(1) + go func(i int, runner TaskRunner) { + defer wg.Done() + // **Isolate context** for each branch! + branchSupport := parentSupport.CloneWithContext(cancelCtx) + + select { + case <-cancelCtx.Done(): + return + default: + } + + out, err := runner.Run(input, branchSupport) + if err != nil { + errs <- err + return + } + results[i] = out + + if f.Task.Fork.Compete { + select { + case resultCh <- out: + once.Do(func() { + cancel() // **signal cancellation** to all other branches + close(done) // signal we have a winner + }) + default: + } + } + }(i, runner) + } + + if f.Task.Fork.Compete { + select { + case <-done: + return <-resultCh, nil + case err := <-errs: + return nil, err + } + } + + wg.Wait() + select { + case err := <-errs: + return nil, err + default: + } + return results, nil +} diff --git a/impl/task_runner_fork_test.go b/impl/task_runner_fork_test.go new file mode 100644 index 0000000..f38b817 --- /dev/null +++ b/impl/task_runner_fork_test.go @@ -0,0 +1,101 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package impl + +import ( + "context" + "testing" + "time" + + "github.com/serverlessworkflow/sdk-go/v3/model" + "github.com/stretchr/testify/assert" +) + +// dummyRunner simulates a TaskRunner that returns its name after an optional delay. +type dummyRunner struct { + name string + delay time.Duration +} + +func (d *dummyRunner) GetTaskName() string { + return d.name +} + +func (d *dummyRunner) Run(input interface{}, ts TaskSupport) (interface{}, error) { + select { + case <-ts.GetContext().Done(): + // canceled + return nil, ts.GetContext().Err() + case <-time.After(d.delay): + // complete after delay + return d.name, nil + } +} + +func TestForkTaskRunner_NonCompete(t *testing.T) { + // Prepare a TaskSupport with a background context + ts := newTaskSupport(withContext(context.Background())) + + // Two branches that complete immediately + branches := []TaskRunner{ + &dummyRunner{name: "r1", delay: 0}, + &dummyRunner{name: "r2", delay: 0}, + } + fork := ForkTaskRunner{ + Task: &model.ForkTask{ + Fork: model.ForkTaskConfiguration{ + Compete: false, + }, + }, + TaskName: "fork", + BranchRunners: branches, + } + + output, err := fork.Run("in", ts) + assert.NoError(t, err) + + results, ok := output.([]interface{}) + assert.True(t, ok, "expected output to be []interface{}") + assert.Equal(t, []interface{}{"r1", "r2"}, results) +} + +func TestForkTaskRunner_Compete(t *testing.T) { + // Prepare a TaskSupport with a background context + ts := newTaskSupport(withContext(context.Background())) + + // One fast branch and one slow branch + branches := []TaskRunner{ + &dummyRunner{name: "fast", delay: 10 * time.Millisecond}, + &dummyRunner{name: "slow", delay: 50 * time.Millisecond}, + } + fork := ForkTaskRunner{ + Task: &model.ForkTask{ + Fork: model.ForkTaskConfiguration{ + Compete: true, + }, + }, + TaskName: "fork", + BranchRunners: branches, + } + + start := time.Now() + output, err := fork.Run("in", ts) + elapsed := time.Since(start) + + assert.NoError(t, err) + assert.Equal(t, "fast", output) + // ensure compete returns before the slow branch would finish + assert.Less(t, elapsed, 50*time.Millisecond, "compete should cancel the slow branch") +} diff --git a/impl/task_runner_raise.go b/impl/task_runner_raise.go index 0de588f..dddaf0c 100644 --- a/impl/task_runner_raise.go +++ b/impl/task_runner_raise.go @@ -17,6 +17,7 @@ package impl import ( "fmt" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -71,13 +72,13 @@ func (r *RaiseTaskRunner) Run(input interface{}, taskSupport TaskSupport) (outpu output = input // TODO: make this an external func so we can call it after getting the reference? Or we can get the reference from the workflow definition var detailResult interface{} - detailResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName, taskSupport.GetContext()) + detailResult, err = expr.TraverseAndEvaluateObj(r.Task.Raise.Error.Definition.Detail.AsObjectOrRuntimeExpr(), input, r.TaskName, taskSupport.GetContext()) if err != nil { return nil, err } var titleResult interface{} - titleResult, err = traverseAndEvaluate(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName, taskSupport.GetContext()) + titleResult, err = expr.TraverseAndEvaluateObj(r.Task.Raise.Error.Definition.Title.AsObjectOrRuntimeExpr(), input, r.TaskName, taskSupport.GetContext()) if err != nil { return nil, err } diff --git a/impl/task_runner_set.go b/impl/task_runner_set.go index 40ff185..f2aaaa9 100644 --- a/impl/task_runner_set.go +++ b/impl/task_runner_set.go @@ -17,6 +17,9 @@ package impl import ( "fmt" + "github.com/serverlessworkflow/sdk-go/v3/impl/expr" + "github.com/serverlessworkflow/sdk-go/v3/impl/utils" + "github.com/serverlessworkflow/sdk-go/v3/model" ) @@ -40,8 +43,8 @@ func (s *SetTaskRunner) GetTaskName() string { } func (s *SetTaskRunner) Run(input interface{}, taskSupport TaskSupport) (output interface{}, err error) { - setObject := deepClone(s.Task.Set) - result, err := traverseAndEvaluate(model.NewObjectOrRuntimeExpr(setObject), input, s.TaskName, taskSupport.GetContext()) + setObject := utils.DeepClone(s.Task.Set) + result, err := expr.TraverseAndEvaluateObj(model.NewObjectOrRuntimeExpr(setObject), input, s.TaskName, taskSupport.GetContext()) if err != nil { return nil, err } diff --git a/impl/task_set_test.go b/impl/task_runner_set_test.go similarity index 100% rename from impl/task_set_test.go rename to impl/task_runner_set_test.go diff --git a/.github/labeler.yml b/impl/testdata/fork_simple.yaml similarity index 51% rename from .github/labeler.yml rename to impl/testdata/fork_simple.yaml index 49abd17..044b1e2 100644 --- a/.github/labeler.yml +++ b/impl/testdata/fork_simple.yaml @@ -1,10 +1,10 @@ -# Copyright 2022 The Serverless Workflow Specification Authors +# Copyright 2025 The Serverless Workflow Specification Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -12,9 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -"documentation :notebook:": - - changed-files: - - any-glob-to-any-file: ['contrib/*', '**/*.md'] -kubernetes: - - changed-files: - - any-glob-to-any-file: ['kubernetes/*', 'hack/builder-gen.sh', 'hack/deepcopy-gen.sh', 'Makefile'] +document: + dsl: '1.0.0' + namespace: test + name: fork-example + version: '0.1.0' +do: + - branchColors: + fork: + compete: false + branches: + - setRed: + set: + color1: red + - setBlue: + set: + color2: blue + - joinResult: + set: + colors: "${ [.[] | .[]] }" diff --git a/impl/utils.go b/impl/utils.go deleted file mode 100644 index a62559d..0000000 --- a/impl/utils.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2025 The Serverless Workflow Specification Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package impl - -import ( - "context" - - "github.com/serverlessworkflow/sdk-go/v3/impl/expr" - "github.com/serverlessworkflow/sdk-go/v3/model" -) - -// Deep clone a map to avoid modifying the original object -func deepClone(obj map[string]interface{}) map[string]interface{} { - clone := make(map[string]interface{}) - for key, value := range obj { - clone[key] = deepCloneValue(value) - } - return clone -} - -func deepCloneValue(value interface{}) interface{} { - if m, ok := value.(map[string]interface{}); ok { - return deepClone(m) - } - if s, ok := value.([]interface{}); ok { - clonedSlice := make([]interface{}, len(s)) - for i, v := range s { - clonedSlice[i] = deepCloneValue(v) - } - return clonedSlice - } - return value -} - -func validateSchema(data interface{}, schema *model.Schema, taskName string) error { - if schema != nil { - if err := ValidateJSONSchema(data, schema); err != nil { - return model.NewErrValidation(err, taskName) - } - } - return nil -} - -func traverseAndEvaluate(runtimeExpr *model.ObjectOrRuntimeExpr, input interface{}, taskName string, wfCtx context.Context) (output interface{}, err error) { - if runtimeExpr == nil { - return input, nil - } - output, err = expr.TraverseAndEvaluate(runtimeExpr.AsStringOrMap(), input, wfCtx) - if err != nil { - return nil, model.NewErrExpression(err, taskName) - } - return output, nil -} - -func traverseAndEvaluateBool(runtimeExpr string, input interface{}, wfCtx context.Context) (bool, error) { - if len(runtimeExpr) == 0 { - return false, nil - } - output, err := expr.TraverseAndEvaluate(runtimeExpr, input, wfCtx) - if err != nil { - return false, nil - } - if result, ok := output.(bool); ok { - return result, nil - } - return false, nil -} diff --git a/impl/json_schema.go b/impl/utils/json_schema.go similarity index 84% rename from impl/json_schema.go rename to impl/utils/json_schema.go index 396f9f5..9b91553 100644 --- a/impl/json_schema.go +++ b/impl/utils/json_schema.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package impl +package utils import ( "encoding/json" @@ -23,8 +23,8 @@ import ( "github.com/xeipuuv/gojsonschema" ) -// ValidateJSONSchema validates the provided data against a model.Schema. -func ValidateJSONSchema(data interface{}, schema *model.Schema) error { +// validateJSONSchema validates the provided data against a model.Schema. +func validateJSONSchema(data interface{}, schema *model.Schema) error { if schema == nil { return nil } @@ -68,3 +68,12 @@ func ValidateJSONSchema(data interface{}, schema *model.Schema) error { return nil } + +func ValidateSchema(data interface{}, schema *model.Schema, taskName string) error { + if schema != nil { + if err := validateJSONSchema(data, schema); err != nil { + return model.NewErrValidation(err, taskName) + } + } + return nil +} diff --git a/impl/utils/utils.go b/impl/utils/utils.go new file mode 100644 index 0000000..f444139 --- /dev/null +++ b/impl/utils/utils.go @@ -0,0 +1,38 @@ +// Copyright 2025 The Serverless Workflow Specification Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +// DeepClone a map to avoid modifying the original object +func DeepClone(obj map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}) + for key, value := range obj { + clone[key] = DeepCloneValue(value) + } + return clone +} + +func DeepCloneValue(value interface{}) interface{} { + if m, ok := value.(map[string]interface{}); ok { + return DeepClone(m) + } + if s, ok := value.([]interface{}); ok { + clonedSlice := make([]interface{}, len(s)) + for i, v := range s { + clonedSlice[i] = DeepCloneValue(v) + } + return clonedSlice + } + return value +}