Skip to content

Commit dd9a2d5

Browse files
authored
workflow examples: remove use of deprecated functions (dapr#640)
* workflow examples: remove use of deprecated functions Signed-off-by: Fabian Martinez <[email protected]> * fix example tests Signed-off-by: Fabian Martinez <[email protected]> * Update README.md Signed-off-by: Fabian Martinez <[email protected]> * Update Makefile Signed-off-by: Fabian Martinez <[email protected]> --------- Signed-off-by: Fabian Martinez <[email protected]>
1 parent 516684c commit dd9a2d5

File tree

5 files changed

+41
-172
lines changed

5 files changed

+41
-172
lines changed

examples/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/golang/protobuf v1.5.4 // indirect
2727
github.com/kr/pretty v0.3.1 // indirect
2828
github.com/marusama/semaphore/v2 v2.5.0 // indirect
29-
github.com/microsoft/durabletask-go v0.5.0 // indirect
29+
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
3030
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3131
go.opentelemetry.io/otel v1.27.0 // indirect
3232
go.opentelemetry.io/otel/metric v1.27.0 // indirect

examples/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3939
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
4040
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
4141
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
42-
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
43-
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
42+
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
43+
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
4444
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
4545
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
4646
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=

examples/workflow/README.md

+1-8
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,9 @@ expected_stdout_lines:
2626
- '== APP == workflow purged'
2727
- '== APP == stage: 2'
2828
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
29+
- '== APP == workflow status: RUNNING'
2930
- '== APP == workflow terminated'
3031
- '== APP == workflow purged'
31-
- '== APP == workflow client test'
32-
- '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
33-
- '== APP == [wfclient] workflow status: RUNNING'
34-
- '== APP == [wfclient] stage: 1'
35-
- '== APP == [wfclient] event raised'
36-
- '== APP == [wfclient] stage: 2'
37-
- '== APP == [wfclient] workflow terminated'
38-
- '== APP == [wfclient] workflow purged'
3932
- '== APP == workflow worker successfully shutdown'
4033
4134
background: true

examples/workflow/main.go

+28-160
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,11 @@ import (
2020
"log"
2121
"time"
2222

23-
"github.com/dapr/go-sdk/client"
2423
"github.com/dapr/go-sdk/workflow"
2524
)
2625

2726
var stage = 0
2827

29-
const (
30-
workflowComponent = "dapr"
31-
)
32-
3328
func main() {
3429
w, err := workflow.NewWorker()
3530
if err != nil {
@@ -54,70 +49,49 @@ func main() {
5449
}
5550
fmt.Println("runner started")
5651

57-
daprClient, err := client.NewClient()
52+
wfClient, err := workflow.NewClient()
5853
if err != nil {
5954
log.Fatalf("failed to intialise client: %v", err)
6055
}
61-
defer daprClient.Close()
56+
defer wfClient.Close()
6257
ctx := context.Background()
6358

6459
// Start workflow test
65-
respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
66-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
67-
WorkflowComponent: workflowComponent,
68-
WorkflowName: "TestWorkflow",
69-
Options: nil,
70-
Input: 1,
71-
SendRawInput: false,
72-
})
60+
instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
7361
if err != nil {
7462
log.Fatalf("failed to start workflow: %v", err)
7563
}
76-
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
64+
fmt.Printf("workflow started with id: %v\n", instanceID)
7765

7866
// Pause workflow test
79-
err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
80-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
81-
WorkflowComponent: workflowComponent,
82-
})
83-
67+
err = wfClient.SuspendWorkflow(ctx, instanceID, "")
8468
if err != nil {
8569
log.Fatalf("failed to pause workflow: %v", err)
8670
}
8771

88-
respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
89-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
90-
WorkflowComponent: workflowComponent,
91-
})
72+
respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
9273
if err != nil {
93-
log.Fatalf("failed to get workflow: %v", err)
74+
log.Fatalf("failed to fetch workflow: %v", err)
9475
}
9576

96-
if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
97-
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
77+
if respFetch.RuntimeStatus != workflow.StatusSuspended {
78+
log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus)
9879
}
9980

10081
fmt.Printf("workflow paused\n")
10182

10283
// Resume workflow test
103-
err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
104-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
105-
WorkflowComponent: workflowComponent,
106-
})
107-
84+
err = wfClient.ResumeWorkflow(ctx, instanceID, "")
10885
if err != nil {
10986
log.Fatalf("failed to resume workflow: %v", err)
11087
}
11188

112-
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
113-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
114-
WorkflowComponent: workflowComponent,
115-
})
89+
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
11690
if err != nil {
11791
log.Fatalf("failed to get workflow: %v", err)
11892
}
11993

120-
if respGet.RuntimeStatus != workflow.StatusRunning.String() {
94+
if respFetch.RuntimeStatus != workflow.StatusRunning {
12195
log.Fatalf("workflow not running")
12296
}
12397

@@ -127,14 +101,7 @@ func main() {
127101

128102
// Raise Event Test
129103

130-
err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
131-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
132-
WorkflowComponent: workflowComponent,
133-
EventName: "testEvent",
134-
EventData: "testData",
135-
SendRawData: false,
136-
})
137-
104+
err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
138105
if err != nil {
139106
fmt.Printf("failed to raise event: %v", err)
140107
}
@@ -145,152 +112,53 @@ func main() {
145112

146113
fmt.Printf("stage: %d\n", stage)
147114

148-
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
149-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
150-
WorkflowComponent: workflowComponent,
151-
})
115+
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
152116
if err != nil {
153117
log.Fatalf("failed to get workflow: %v", err)
154118
}
155119

156-
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
120+
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)
157121

158122
// Purge workflow test
159-
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
160-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
161-
WorkflowComponent: workflowComponent,
162-
})
123+
err = wfClient.PurgeWorkflow(ctx, instanceID)
163124
if err != nil {
164125
log.Fatalf("failed to purge workflow: %v", err)
165126
}
166127

167-
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
168-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
169-
WorkflowComponent: workflowComponent,
170-
})
171-
if err != nil && respGet != nil {
172-
log.Fatal("failed to purge workflow")
128+
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
129+
if err == nil || respFetch != nil {
130+
log.Fatalf("failed to purge workflow: %v", err)
173131
}
174132

175133
fmt.Println("workflow purged")
176134

177135
fmt.Printf("stage: %d\n", stage)
178136

179137
// Terminate workflow test
180-
respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
181-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
182-
WorkflowComponent: workflowComponent,
183-
WorkflowName: "TestWorkflow",
184-
Options: nil,
185-
Input: 1,
186-
SendRawInput: false,
187-
})
138+
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
188139
if err != nil {
189140
log.Fatalf("failed to start workflow: %v", err)
190141
}
142+
fmt.Printf("workflow started with id: %v\n", instanceID)
191143

192-
fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
193-
194-
err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
195-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
196-
WorkflowComponent: workflowComponent,
197-
})
144+
metadata, err := wfClient.WaitForWorkflowStart(ctx, id)
198145
if err != nil {
199-
log.Fatalf("failed to terminate workflow: %v", err)
146+
log.Fatalf("failed to get workflow: %v", err)
200147
}
148+
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
201149

202-
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
203-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
204-
WorkflowComponent: workflowComponent,
205-
})
150+
err = wfClient.TerminateWorkflow(ctx, id)
206151
if err != nil {
207-
log.Fatalf("failed to get workflow: %v", err)
208-
}
209-
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
210-
log.Fatal("failed to terminate workflow")
152+
log.Fatalf("failed to terminate workflow: %v", err)
211153
}
212-
213154
fmt.Println("workflow terminated")
214155

215-
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
216-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
217-
WorkflowComponent: workflowComponent,
218-
})
219-
220-
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
221-
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
222-
WorkflowComponent: workflowComponent,
223-
})
224-
if err == nil || respGet != nil {
156+
err = wfClient.PurgeWorkflow(ctx, id)
157+
if err != nil {
225158
log.Fatalf("failed to purge workflow: %v", err)
226159
}
227-
228160
fmt.Println("workflow purged")
229161

230-
// WFClient
231-
// TODO: Expand client validation
232-
233-
stage = 0
234-
fmt.Println("workflow client test")
235-
236-
wfClient, err := workflow.NewClient()
237-
if err != nil {
238-
log.Fatalf("[wfclient] faield to initialize: %v", err)
239-
}
240-
241-
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
242-
if err != nil {
243-
log.Fatalf("[wfclient] failed to start workflow: %v", err)
244-
}
245-
246-
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
247-
248-
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
249-
if err != nil {
250-
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
251-
}
252-
253-
fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
254-
255-
if stage != 1 {
256-
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
257-
}
258-
259-
fmt.Printf("[wfclient] stage: %d\n", stage)
260-
261-
// TODO: WaitForWorkflowStart
262-
// TODO: WaitForWorkflowCompletion
263-
264-
// raise event
265-
266-
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
267-
log.Fatalf("[wfclient] failed to raise event: %v", err)
268-
}
269-
270-
fmt.Println("[wfclient] event raised")
271-
272-
// Sleep to allow the workflow to advance
273-
time.Sleep(time.Second)
274-
275-
if stage != 2 {
276-
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
277-
}
278-
279-
fmt.Printf("[wfclient] stage: %d\n", stage)
280-
281-
// stop workflow
282-
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
283-
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
284-
}
285-
286-
fmt.Println("[wfclient] workflow terminated")
287-
288-
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
289-
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
290-
}
291-
292-
fmt.Println("[wfclient] workflow purged")
293-
294162
// stop workflow runtime
295163
if err := w.Shutdown(); err != nil {
296164
log.Fatalf("failed to shutdown runtime: %v", err)

workflow/client.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ import (
2323
"github.com/microsoft/durabletask-go/api"
2424
"github.com/microsoft/durabletask-go/backend"
2525
durabletaskclient "github.com/microsoft/durabletask-go/client"
26+
"google.golang.org/grpc"
2627

2728
dapr "github.com/dapr/go-sdk/client"
2829
)
2930

3031
type Client struct {
32+
conn *grpc.ClientConn
3133
taskHubClient *durabletaskclient.TaskHubGrpcClient
3234
}
3335

@@ -143,9 +145,11 @@ func NewClient(opts ...clientOption) (*Client, error) {
143145
return &Client{}, fmt.Errorf("failed to initialise dapr.Client: %v", err)
144146
}
145147

146-
taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
148+
conn := daprClient.GrpcClientConn()
149+
taskHubClient := durabletaskclient.NewTaskHubGrpcClient(conn, backend.DefaultLogger())
147150

148151
return &Client{
152+
conn: conn,
149153
taskHubClient: taskHubClient,
150154
}, nil
151155
}
@@ -241,3 +245,7 @@ func (c *Client) PurgeWorkflow(ctx context.Context, id string, opts ...api.Purge
241245
}
242246
return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id), opts...)
243247
}
248+
249+
func (c *Client) Close() {
250+
_ = c.conn.Close()
251+
}

0 commit comments

Comments
 (0)