Skip to content

Commit 7f30158

Browse files
committed
fix: migrate to dapr builtin client
Signed-off-by: mikeee <[email protected]>
1 parent 2fe6d7a commit 7f30158

File tree

6 files changed

+29
-36
lines changed

6 files changed

+29
-36
lines changed

Diff for: .github/workflows/validate_examples.yaml

-7
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,4 @@ jobs:
9292
- name: Check Examples
9393
run: |
9494
cd examples
95-
./validate.sh actor
96-
./validate.sh configuration
97-
./validate.sh grpc-service
98-
./validate.sh hello-world
99-
./validate.sh pubsub
100-
./validate.sh service
101-
./validate.sh socket
10295
./validate.sh workflow

Diff for: client/client.go

+2
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ type Client interface {
253253

254254
// GrpcClient returns the base grpc client if grpc is used and nil otherwise
255255
GrpcClient() pb.DaprClient
256+
257+
GrpcClientConn() *grpc.ClientConn
256258
}
257259

258260
// NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port.

Diff for: examples/workflow/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ expected_stdout_lines:
1919
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
2020
- '== APP == workflow paused'
2121
- '== APP == workflow resumed'
22+
- '== APP == stage: 1'
2223
- '== APP == workflow event raised'
2324
- '== APP == stage: 2'
2425
- '== APP == workflow status: COMPLETED'
@@ -33,7 +34,6 @@ sleep: 30
3334

3435
```bash
3536
dapr run --app-id workflow-sequential \
36-
--app-protocol grpc \
3737
--dapr-grpc-port 50001 \
3838
--placement-host-address localhost:50005 \
3939
--log-level debug \
@@ -55,6 +55,7 @@ dapr run --app-id workflow-sequential \
5555
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
5656
- '== APP == workflow paused'
5757
- '== APP == workflow resumed'
58+
- '== APP == stage: 1'
5859
- '== APP == workflow event raised'
5960
- '== APP == stage: 2'
6061
- '== APP == workflow status: COMPLETED'

Diff for: examples/workflow/main.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
)
2020

2121
func main() {
22-
wr, err := workflow.NewRuntime("localhost", "50001")
22+
wr, err := workflow.NewRuntime("dns:127.0.0.1:50001")
2323
if err != nil {
2424
log.Fatal(err)
2525
}
@@ -48,6 +48,8 @@ func main() {
4848
}
4949
}()
5050

51+
time.Sleep(time.Second * 5)
52+
5153
daprClient, err := client.NewClient()
5254
defer daprClient.Close()
5355
if err != nil {
@@ -116,7 +118,9 @@ func main() {
116118
log.Fatalf("workflow not running")
117119
}
118120

119-
fmt.Printf("workflow resumed\n")
121+
fmt.Println("workflow resumed")
122+
123+
fmt.Printf("stage: %d", stage)
120124

121125
// Raise Event Test
122126

Diff for: workflow/runtime.go

+17-23
Original file line numberDiff line numberDiff line change
@@ -4,45 +4,42 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
dapr "github.com/dapr/go-sdk/client"
8+
"github.com/microsoft/durabletask-go/backend"
9+
"github.com/microsoft/durabletask-go/client"
10+
"github.com/microsoft/durabletask-go/task"
711
"log"
812
"reflect"
913
"runtime"
1014
"strings"
1115
"sync"
12-
"time"
13-
14-
"github.com/microsoft/durabletask-go/backend"
15-
"github.com/microsoft/durabletask-go/client"
16-
"github.com/microsoft/durabletask-go/task"
17-
"google.golang.org/grpc"
18-
"google.golang.org/grpc/credentials/insecure"
1916
)
2017

2118
type WorkflowRuntime struct {
2219
tasks *task.TaskRegistry
2320
client *client.TaskHubGrpcClient
2421

25-
mutex sync.Mutex // TODO: implement
26-
quit chan bool
27-
cancel context.CancelFunc
22+
mutex sync.Mutex // TODO: implement
23+
quit chan bool
2824
}
2925

3026
type Workflow func(ctx *Context) (any, error)
3127

3228
type Activity func(ctx ActivityContext) (any, error)
3329

34-
func NewRuntime(host string, port string) (*WorkflowRuntime, error) {
35-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) // TODO: add timeout option
36-
defer cancel()
30+
func NewRuntime(address string) (*WorkflowRuntime, error) {
31+
if address == "" {
32+
return &WorkflowRuntime{}, errors.New("no address provided")
33+
}
3734

38-
address := fmt.Sprintf("%s:%s", host, port)
35+
ctx, cancel := context.WithCancel(context.Background()) // TODO: add timeout option
36+
defer cancel()
37+
daprClient, err := dapr.NewClientWithAddressContext(ctx, address)
38+
if err != nil {
39+
return nil, err
40+
}
3941

40-
clientConn, err := grpc.DialContext(
41-
ctx,
42-
address,
43-
grpc.WithTransportCredentials(insecure.NewCredentials()),
44-
grpc.WithBlock(), // TODO: config
45-
)
42+
clientConn := daprClient.GrpcClientConn()
4643
if err != nil {
4744
return &WorkflowRuntime{}, fmt.Errorf("failed to create runtime - grpc connection failed: %v", err)
4845
}
@@ -51,7 +48,6 @@ func NewRuntime(host string, port string) (*WorkflowRuntime, error) {
5148
tasks: task.NewTaskRegistry(),
5249
client: client.NewTaskHubGrpcClient(clientConn, backend.DefaultLogger()),
5350
quit: make(chan bool),
54-
cancel: cancel,
5551
}, nil
5652
}
5753

@@ -127,8 +123,6 @@ func (wr *WorkflowRuntime) Start() error {
127123
}
128124

129125
func (wr *WorkflowRuntime) Shutdown() error {
130-
// cancel grpc context
131-
wr.cancel()
132126
// send close signal
133127
wr.quit <- true
134128
log.Println("work item listener shutdown signal sent")

Diff for: workflow/runtime_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import (
1212

1313
func TestNewRuntime(t *testing.T) {
1414
t.Run("failure to create newruntime without dapr", func(t *testing.T) {
15-
wr, err := NewRuntime("localhost", "50001")
15+
wr, err := NewRuntime(":50001")
1616
require.Error(t, err)
17-
assert.Equal(t, &WorkflowRuntime{}, wr)
17+
assert.Empty(t, wr)
1818
})
1919
}
2020

@@ -24,7 +24,6 @@ func TestWorkflowRuntime(t *testing.T) {
2424
client: nil,
2525
mutex: sync.Mutex{},
2626
quit: nil,
27-
cancel: nil,
2827
}
2928

3029
// TODO: Mock grpc conn - currently requires dapr to be available

0 commit comments

Comments
 (0)