Skip to content

Commit 59d35de

Browse files
committed
add support for parallel workflows
Signed-off-by: mikeee <[email protected]>
1 parent 60e4722 commit 59d35de

File tree

6 files changed

+207
-0
lines changed

6 files changed

+207
-0
lines changed

.github/workflows/validate_examples.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ jobs:
164164
"service",
165165
"socket",
166166
"workflow",
167+
"workflow-parallel",
167168
]
168169
steps:
169170
- name: Check out code onto GOPATH

examples/workflow-parallel/README.md

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Dapr Parallel Workflow Example with go-sdk
2+
3+
## Step
4+
5+
### Prepare
6+
7+
- Dapr installed
8+
9+
### Run Workflow
10+
11+
<!-- STEP
12+
name: Run Workflow
13+
output_match_mode: substring
14+
expected_stdout_lines:
15+
- '== APP == Workflow(s) and activities registered.'
16+
- 'work item listener started'
17+
- '== APP == Processing work item: 9'
18+
- '== APP == Work item 9 processed. Result: 18'
19+
- '== APP == Final result: 90'
20+
- '== APP == workflow status: COMPLETED'
21+
- '== APP == workflow terminated'
22+
- '== APP == workflow purged'
23+
24+
background: true
25+
sleep: 30
26+
-->
27+
28+
```bash
29+
dapr run --app-id workflow-parallel \
30+
--dapr-grpc-port 50001 \
31+
--log-level debug \
32+
--resources-path ./config \
33+
-- go run ./main.go
34+
```
35+
36+
<!-- END_STEP -->
37+
38+
## Result
39+
40+
```
41+
- '== APP == Workflow(s) and activities registered.'
42+
- 'work item listener started'
43+
- '== APP == Processing work item: 9'
44+
- '== APP == Work item 9 processed. Result: 18'
45+
- '== APP == Final result: 90'
46+
- '== APP == workflow status: COMPLETED'
47+
- '== APP == workflow terminated'
48+
- '== APP == workflow purged'
49+
```
50+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: wf-store
5+
spec:
6+
type: state.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: redisPassword
12+
value: ""
13+
- name: actorStateStore
14+
value: "true"

examples/workflow-parallel/main.go

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/dapr/go-sdk/workflow"
10+
)
11+
12+
func main() {
13+
w, err := workflow.NewWorker()
14+
if err != nil {
15+
log.Fatalf("failed to initialise worker: %v", err)
16+
}
17+
18+
if err := w.RegisterWorkflow(BatchProcessingWorkflow); err != nil {
19+
log.Fatalf("failed to register workflow: %v", err)
20+
}
21+
if err := w.RegisterActivity(GetWorkBatch); err != nil {
22+
log.Fatalf("failed to register activity: %v", err)
23+
}
24+
if err := w.RegisterActivity(ProcessWorkItem); err != nil {
25+
log.Fatalf("failed to register activity: %v", err)
26+
}
27+
if err := w.RegisterActivity(ProcessResults); err != nil {
28+
log.Fatalf("failed to register activity: %v", err)
29+
}
30+
fmt.Println("Workflow(s) and activities registered.")
31+
32+
if err := w.Start(); err != nil {
33+
log.Fatalf("failed to start worker")
34+
}
35+
36+
wfClient, err := workflow.NewClient()
37+
if err != nil {
38+
log.Fatalf("failed to initialise client: %v", err)
39+
}
40+
ctx := context.Background()
41+
id, err := wfClient.ScheduleNewWorkflow(ctx, "BatchProcessingWorkflow", workflow.WithInput(10))
42+
if err != nil {
43+
log.Fatalf("failed to schedule a new workflow: %v", err)
44+
}
45+
46+
metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id)
47+
if err != nil {
48+
log.Fatalf("failed to get workflow: %v", err)
49+
}
50+
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
51+
52+
err = wfClient.TerminateWorkflow(ctx, id)
53+
if err != nil {
54+
log.Fatalf("failed to terminate workflow: %v", err)
55+
}
56+
fmt.Println("workflow terminated")
57+
58+
err = wfClient.PurgeWorkflow(ctx, id)
59+
if err != nil {
60+
log.Fatalf("failed to purge workflow: %v", err)
61+
}
62+
fmt.Println("workflow purged")
63+
}
64+
65+
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
66+
var input int
67+
if err := ctx.GetInput(&input); err != nil {
68+
return 0, err
69+
}
70+
71+
var workBatch []int
72+
if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
73+
return 0, err
74+
}
75+
76+
parallelTasks := workflow.NewTaskSlice(len(workBatch))
77+
for i, workItem := range workBatch {
78+
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
79+
}
80+
81+
var outputs int
82+
for _, task := range parallelTasks {
83+
var output int
84+
err := task.Await(&output)
85+
if err == nil {
86+
outputs += output
87+
} else {
88+
return 0, err
89+
}
90+
}
91+
92+
if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
93+
return 0, err
94+
}
95+
96+
return 0, nil
97+
}
98+
99+
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
100+
var batchSize int
101+
if err := ctx.GetInput(&batchSize); err != nil {
102+
return 0, err
103+
}
104+
batch := make([]int, batchSize)
105+
for i := 0; i < batchSize; i++ {
106+
batch[i] = i
107+
}
108+
return batch, nil
109+
}
110+
111+
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
112+
var workItem int
113+
if err := ctx.GetInput(&workItem); err != nil {
114+
return 0, err
115+
}
116+
fmt.Printf("Processing work item: %d\n", workItem)
117+
time.Sleep(time.Second * 5)
118+
result := workItem * 2
119+
fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
120+
return result, nil
121+
}
122+
123+
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
124+
var finalResult int
125+
if err := ctx.GetInput(&finalResult); err != nil {
126+
return 0, err
127+
}
128+
fmt.Printf("Final result: %d\n", finalResult)
129+
return finalResult, nil
130+
}

workflow/workflow.go

+7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/microsoft/durabletask-go/api"
22+
"github.com/microsoft/durabletask-go/task"
2223
"google.golang.org/protobuf/types/known/wrapperspb"
2324
)
2425

@@ -119,3 +120,9 @@ func ChildWorkflowInstanceID(instanceID string) callChildWorkflowOption {
119120
return nil
120121
}
121122
}
123+
124+
// NewTaskSlice returns a slice of tasks which can be executed in parallel
125+
func NewTaskSlice(length int) []task.Task {
126+
taskSlice := make([]task.Task, length)
127+
return taskSlice
128+
}

workflow/workflow_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,8 @@ func returnCallChildWorkflowOptions(opts ...callChildWorkflowOption) callChildWo
4848
}
4949
return *options
5050
}
51+
52+
func TestNewTaskSlice(t *testing.T) {
53+
tasks := NewTaskSlice(10)
54+
assert.Len(t, tasks, 10)
55+
}

0 commit comments

Comments
 (0)