Skip to content

Commit e813705

Browse files
committed
Add partial 'For' implementation
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 731d02e commit e813705

File tree

5 files changed

+114
-2
lines changed

5 files changed

+114
-2
lines changed

expr/expr.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func TraverseAndEvaluate(node interface{}, input interface{}) (interface{}, erro
8686
}
8787
}
8888

89+
// TODO: add support to variables see https://github.com/itchyny/gojq/blob/main/option_variables_test.go
90+
8991
// evaluateJQExpression evaluates a jq expression against a given JSON input
9092
func evaluateJQExpression(expression string, input interface{}) (interface{}, error) {
9193
// Parse the sanitized jq expression

impl/task_runner.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package impl
1616

1717
import (
1818
"fmt"
19+
"reflect"
20+
"strings"
1921

2022
"github.com/serverlessworkflow/sdk-go/v3/expr"
2123
"github.com/serverlessworkflow/sdk-go/v3/model"
@@ -163,14 +165,86 @@ func NewForTaskRunner(taskName string, task *model.ForTask, taskSupport TaskSupp
163165
}, nil
164166
}
165167

168+
const (
169+
forTaskDefaultEach = "$item"
170+
forTaskDefaultAt = "$index"
171+
)
172+
166173
type ForTaskRunner struct {
167174
Task *model.ForTask
168175
TaskName string
169176
DoRunner *DoTaskRunner
170177
}
171178

172179
func (f *ForTaskRunner) Run(input interface{}) (interface{}, error) {
173-
return input, nil
180+
f.sanitizeFor()
181+
in, err := expr.TraverseAndEvaluate(f.Task.For.In, input)
182+
if err != nil {
183+
return nil, err
184+
}
185+
186+
var forOutput interface{}
187+
rv := reflect.ValueOf(in)
188+
switch rv.Kind() {
189+
case reflect.Slice, reflect.Array:
190+
for i := 0; i < rv.Len(); i++ {
191+
item := rv.Index(i).Interface()
192+
193+
if forOutput, err = f.processForItem(i, item, forOutput); err != nil {
194+
return nil, err
195+
}
196+
}
197+
case reflect.Invalid:
198+
return input, nil
199+
default:
200+
if forOutput, err = f.processForItem(0, in, forOutput); err != nil {
201+
return nil, err
202+
}
203+
}
204+
205+
return forOutput, nil
206+
}
207+
208+
func (f *ForTaskRunner) processForItem(idx int, item interface{}, forOutput interface{}) (interface{}, error) {
209+
forInput := map[string]interface{}{
210+
f.Task.For.At: idx,
211+
f.Task.For.Each: item,
212+
}
213+
if forOutput != nil {
214+
if outputMap, ok := forOutput.(map[string]interface{}); ok {
215+
for key, value := range outputMap {
216+
forInput[key] = value
217+
}
218+
} else {
219+
return nil, fmt.Errorf("task %s item %s at index %d returned a non-json object, impossible to merge context", f.TaskName, f.Task.For.Each, idx)
220+
}
221+
}
222+
var err error
223+
forOutput, err = f.DoRunner.Run(forInput)
224+
if err != nil {
225+
return nil, err
226+
}
227+
228+
return forOutput, nil
229+
}
230+
231+
func (f *ForTaskRunner) sanitizeFor() {
232+
f.Task.For.Each = strings.TrimSpace(f.Task.For.Each)
233+
f.Task.For.At = strings.TrimSpace(f.Task.For.At)
234+
235+
if f.Task.For.Each == "" {
236+
f.Task.For.Each = forTaskDefaultEach
237+
}
238+
if f.Task.For.At == "" {
239+
f.Task.For.At = forTaskDefaultAt
240+
}
241+
242+
if !strings.HasPrefix(f.Task.For.Each, "$") {
243+
f.Task.For.Each = "$" + f.Task.For.Each
244+
}
245+
if !strings.HasPrefix(f.Task.For.At, "$") {
246+
f.Task.For.At = "$" + f.Task.For.At
247+
}
174248
}
175249

176250
func (f *ForTaskRunner) GetTaskName() string {

impl/task_runner_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,5 +309,22 @@ func TestWorkflowRunner_Run_YAML_RaiseTasks_ControlFlow(t *testing.T) {
309309
assert.Equal(t, "User is under the required age", model.AsError(err).Detail.String())
310310
})
311311
})
312+
}
313+
314+
func TestForTaskRunner_Run(t *testing.T) {
315+
t.Skip("Skipping until the For task is implemented - missing JQ variables implementation")
316+
t.Run("Simple For with Colors", func(t *testing.T) {
317+
workflowPath := "./testdata/for_colors.yaml"
318+
input := map[string]interface{}{
319+
"colors": []string{"red", "green", "blue"},
320+
}
321+
expectedOutput := map[string]interface{}{
322+
"processed": map[string]interface{}{
323+
"colors": []string{"red", "green", "blue"},
324+
"indexed": []float64{0, 1, 2},
325+
},
326+
}
327+
runWorkflowTest(t, workflowPath, input, expectedOutput)
328+
})
312329

313330
}

impl/testdata/for_colors.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: default
4+
name: for
5+
version: '1.0.0'
6+
do:
7+
- loopColors:
8+
for:
9+
each: color
10+
in: '${ .colors }'
11+
do:
12+
- markProcessed:
13+
set:
14+
processed: '${ { colors: (.processed.colors + [ $color ]), indexes: (.processed.indexes + [ $index ])} }'

model/task.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ func unmarshalTask(key string, taskRaw json.RawMessage) (Task, error) {
143143
return nil, fmt.Errorf("failed to parse task type for key '%s': %w", key, err)
144144
}
145145

146+
// TODO: not the most elegant; can be improved in a smarter way
147+
146148
// Determine task type
147149
var task Task
148150
if callValue, hasCall := taskType["call"].(string); hasCall {
@@ -154,8 +156,11 @@ func unmarshalTask(key string, taskRaw json.RawMessage) (Task, error) {
154156
// Default to CallFunction for unrecognized call values
155157
task = &CallFunction{}
156158
}
159+
} else if _, hasFor := taskType["for"]; hasFor {
160+
// Handle special case "for" that also has "do"
161+
task = taskTypeRegistry["for"]()
157162
} else {
158-
// Handle non-call tasks (e.g., "do", "fork")
163+
// Handle everything else (e.g., "do", "fork")
159164
for typeKey := range taskType {
160165
if constructor, exists := taskTypeRegistry[typeKey]; exists {
161166
task = constructor()

0 commit comments

Comments
 (0)