Skip to content

Commit 947414d

Browse files
committed
feat: initial wfclient implementation
Signed-off-by: mikeee <[email protected]>
1 parent a55d81d commit 947414d

File tree

6 files changed

+287
-3
lines changed

6 files changed

+287
-3
lines changed

Diff for: examples/workflow/README.md

+19
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ expected_stdout_lines:
2828
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
2929
- '== APP == workflow terminated'
3030
- '== APP == workflow purged'
31+
- '== APP == workflow client test'
32+
- '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
33+
- '== APP == [wfclient] workflow running'
34+
- '== APP == [wfclient] stage: 1'
35+
- '== APP == [wfclient] event raised'
36+
- '== APP == [wfclient] stage: 2'
37+
- '== APP == [wfclient] workflow terminated'
38+
- '== APP == [wfclient] workflow purged'
39+
- '== APP == workflow runtime successfully shutdown'
40+
3141
background: true
3242
sleep: 60
3343
-->
@@ -61,4 +71,13 @@ dapr run --app-id workflow \
6171
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
6272
- '== APP == workflow terminated'
6373
- '== APP == workflow purged'
74+
- '== APP == workflow client test'
75+
- '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
76+
- '== APP == [wfclient] workflow running'
77+
- '== APP == [wfclient] stage: 1'
78+
- '== APP == [wfclient] event raised'
79+
- '== APP == [wfclient] stage: 2'
80+
- '== APP == [wfclient] workflow terminated'
81+
- '== APP == [wfclient] workflow purged'
82+
- '== APP == workflow runtime successfully shutdown'
6483
```

Diff for: examples/workflow/main.go

+66
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,77 @@ func main() {
222222

223223
fmt.Println("workflow purged")
224224

225+
// WFClient
226+
// TODO: Expand client validation
227+
228+
stage = 0
229+
fmt.Println("workflow client test")
230+
231+
wfClient, err := workflow.NewClient()
232+
if err != nil {
233+
log.Fatalf("[wfclient] faield to initialize: %v", err)
234+
}
235+
236+
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
237+
if err != nil {
238+
log.Fatalf("[wfclient] failed to start workflow: %v", err)
239+
}
240+
241+
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
242+
243+
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
244+
if err != nil {
245+
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
246+
}
247+
248+
fmt.Printf("[wfclient] workflow running: %v\n", metadata.IsRunning())
249+
250+
if stage != 1 {
251+
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
252+
}
253+
254+
fmt.Printf("[wfclient] stage: %d\n", stage)
255+
256+
// TODO: WaitForWorkflowStart
257+
// TODO: WaitForWorkflowCompletion
258+
259+
// raise event
260+
261+
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
262+
log.Fatalf("[wfclient] failed to raise event: %v", err)
263+
}
264+
265+
fmt.Println("[wfclient] event raised")
266+
267+
// Sleep to allow the workflow to advance
268+
time.Sleep(time.Second)
269+
270+
if stage != 2 {
271+
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
272+
}
273+
274+
fmt.Printf("[wfclient] stage: %d\n", stage)
275+
276+
// stop workflow
277+
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
278+
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
279+
}
280+
281+
fmt.Println("[wfclient] workflow terminated")
282+
283+
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
284+
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
285+
}
286+
287+
fmt.Println("[wfclient] workflow purged")
288+
225289
// stop workflow runtime
226290
if err := wr.Shutdown(); err != nil {
227291
log.Fatalf("failed to shutdown runtime: %v", err)
228292
}
229293

294+
fmt.Println("workflow runtime successfully shutdown")
295+
230296
time.Sleep(time.Second * 5)
231297
}
232298

Diff for: workflow/client.go

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package workflow
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/microsoft/durabletask-go/api"
9+
"github.com/microsoft/durabletask-go/backend"
10+
durabletaskclient "github.com/microsoft/durabletask-go/client"
11+
12+
dapr "github.com/dapr/go-sdk/client"
13+
)
14+
15+
type Client interface {
16+
ScheduleNewWorkflow(ctx context.Context) (string, error)
17+
FetchWorkflowMetadata(ctx context.Context) (string, error)
18+
WaitForWorkflowStart(ctx context.Context) (string, error)
19+
WaitForWorkflowCompletion(ctx context.Context) (string, error)
20+
TerminateWorkflow(ctx context.Context) error
21+
RaiseEvent(ctx context.Context) error
22+
SuspendWorkflow(ctx context.Context) error
23+
ResumeWorkflow(ctx context.Context) error
24+
PurgeWorkflow(ctx context.Context) error
25+
}
26+
27+
type client struct {
28+
taskHubClient *durabletaskclient.TaskHubGrpcClient
29+
}
30+
31+
func WithInstanceID(id string) api.NewOrchestrationOptions {
32+
return api.WithInstanceID(api.InstanceID(id))
33+
}
34+
35+
// TODO: Implement WithOrchestrationIdReusePolicy
36+
37+
func WithInput(input any) api.NewOrchestrationOptions {
38+
return api.WithInput(input)
39+
}
40+
41+
func WithRawInput(input string) api.NewOrchestrationOptions {
42+
return api.WithRawInput(input)
43+
}
44+
45+
func WithStartTime(time time.Time) api.NewOrchestrationOptions {
46+
return api.WithStartTime(time)
47+
}
48+
49+
func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions {
50+
return api.WithFetchPayloads(fetchPayloads)
51+
}
52+
53+
func WithEventPayload(data any) api.RaiseEventOptions {
54+
return api.WithEventPayload(data)
55+
}
56+
57+
func WithRawEventData(data string) api.RaiseEventOptions {
58+
return api.WithRawEventData(data)
59+
}
60+
61+
func WithOutput(data any) api.TerminateOptions {
62+
return api.WithOutput(data)
63+
}
64+
65+
func WithRawOutput(data string) api.TerminateOptions {
66+
return api.WithRawOutput(data)
67+
}
68+
69+
// TODO: Implement mocks
70+
71+
func NewClient() (client, error) { // TODO: Implement custom connection
72+
daprClient, err := dapr.NewClient()
73+
if err != nil {
74+
return client{}, err
75+
}
76+
77+
taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
78+
79+
return client{
80+
taskHubClient: taskHubClient,
81+
}, nil
82+
}
83+
84+
func (c *client) ScheduleNewWorkflow(ctx context.Context, workflow string, opts ...api.NewOrchestrationOptions) (id string, err error) {
85+
if workflow == "" {
86+
return "", errors.New("no workflow specified")
87+
}
88+
workflowID, err := c.taskHubClient.ScheduleNewOrchestration(ctx, workflow, opts...)
89+
if err != nil {
90+
return "", err
91+
}
92+
return string(workflowID), nil
93+
}
94+
95+
func (c *client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
96+
if id == "" {
97+
return nil, errors.New("no workflow id specified")
98+
}
99+
return c.taskHubClient.FetchOrchestrationMetadata(ctx, api.InstanceID(id), opts...)
100+
}
101+
102+
func (c *client) WaitForWorkflowStart(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
103+
if id == "" {
104+
return nil, errors.New("no workflow id specified")
105+
}
106+
return c.taskHubClient.WaitForOrchestrationStart(ctx, api.InstanceID(id), opts...)
107+
}
108+
109+
func (c *client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
110+
if id == "" {
111+
return nil, errors.New("no workflow id specified")
112+
}
113+
return c.taskHubClient.WaitForOrchestrationCompletion(ctx, api.InstanceID(id), opts...)
114+
}
115+
116+
func (c *client) TerminateWorkflow(ctx context.Context, id string, opts ...api.TerminateOptions) error {
117+
if id == "" {
118+
return errors.New("no workflow id specified")
119+
}
120+
return c.taskHubClient.TerminateOrchestration(ctx, api.InstanceID(id), opts...)
121+
}
122+
123+
func (c *client) RaiseEvent(ctx context.Context, id, eventName string, opts ...api.RaiseEventOptions) error {
124+
if id == " " {
125+
return errors.New("no workflow id specified")
126+
}
127+
if eventName == "" {
128+
return errors.New("no event name specified")
129+
}
130+
return c.taskHubClient.RaiseEvent(ctx, api.InstanceID(id), eventName, opts...)
131+
}
132+
133+
func (c *client) SuspendWorkflow(ctx context.Context, id, reason string) error {
134+
if id == "" {
135+
return errors.New("no workflow id specified")
136+
}
137+
if reason == "" {
138+
return errors.New("no reason specified")
139+
}
140+
return c.taskHubClient.SuspendOrchestration(ctx, api.InstanceID(id), reason)
141+
}
142+
143+
func (c *client) ResumeWorkflow(ctx context.Context, id, reason string) error {
144+
if id == "" {
145+
return errors.New("no workflow id specified")
146+
}
147+
if reason == "" {
148+
return errors.New("no reason specified")
149+
}
150+
return c.taskHubClient.ResumeOrchestration(ctx, api.InstanceID(id), reason)
151+
}
152+
153+
func (c *client) PurgeWorkflow(ctx context.Context, id string) error {
154+
if id == "" {
155+
return errors.New("no workflow id specified")
156+
}
157+
return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id))
158+
}
159+
160+
func (c *client) Close() error {
161+
return nil
162+
}

Diff for: workflow/client_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package workflow
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestNewClient(t *testing.T) {
11+
// Currently will always fail if no dapr connection available
12+
client, err := NewClient()
13+
assert.Empty(t, client)
14+
require.Error(t, err)
15+
}

Diff for: workflow/runtime.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ import (
1313
dapr "github.com/dapr/go-sdk/client"
1414

1515
"github.com/microsoft/durabletask-go/backend"
16-
"github.com/microsoft/durabletask-go/client"
16+
durabletaskclient "github.com/microsoft/durabletask-go/client"
1717
"github.com/microsoft/durabletask-go/task"
1818
)
1919

2020
type WorkflowRuntime struct {
2121
tasks *task.TaskRegistry
22-
client *client.TaskHubGrpcClient
22+
client *durabletaskclient.TaskHubGrpcClient
2323

2424
mutex sync.Mutex // TODO: implement
2525
quit chan bool
@@ -38,7 +38,7 @@ func NewRuntime() (*WorkflowRuntime, error) {
3838

3939
return &WorkflowRuntime{
4040
tasks: task.NewTaskRegistry(),
41-
client: client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()),
41+
client: durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()),
4242
quit: make(chan bool),
4343
close: daprClient.Close,
4444
}, nil

Diff for: workflow/workflow.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package workflow
2+
3+
import "time"
4+
5+
type Metadata struct {
6+
InstanceID string `json:"id"`
7+
Name string `json:"name"`
8+
RuntimeStatus Status `json:"status"`
9+
CreatedAt time.Time `json:"createdAt"`
10+
LastUpdatedAt time.Time `json:"lastUpdatedAt"`
11+
SerializedInput string `json:"serializedInput"`
12+
SerializedOutput string `json:"serializedOutput"`
13+
SerializedCustomStatus string `json:"serializedCustomStatus"`
14+
FailureDetails *FailureDetails `json:"failureDetails"`
15+
}
16+
17+
type FailureDetails struct {
18+
Type string `json:"type"`
19+
Message string `json:"message"`
20+
StackTrace string `json:"stackTrace"`
21+
InnerFailure *FailureDetails `json:"innerFailure"`
22+
}

0 commit comments

Comments
 (0)