Skip to content

Commit 9f22a3b

Browse files
committed
test - migrate to dapr clientconn
Signed-off-by: mikeee <[email protected]>
1 parent f96da1c commit 9f22a3b

File tree

5 files changed

+19
-45
lines changed

5 files changed

+19
-45
lines changed

.github/workflows/validate_examples.yaml

-7
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,4 @@ jobs:
9292
- name: Check Examples
9393
run: |
9494
cd examples
95-
./validate.sh actor
96-
./validate.sh configuration
97-
./validate.sh grpc-service
98-
./validate.sh hello-world
99-
./validate.sh pubsub
100-
./validate.sh service
101-
./validate.sh socket
10295
./validate.sh workflow

client/client.go

+2
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ type Client interface {
253253

254254
// GrpcClient returns the base grpc client if grpc is used and nil otherwise
255255
GrpcClient() pb.DaprClient
256+
257+
GrpcClientConn() *grpc.ClientConn
256258
}
257259

258260
// NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port.

examples/workflow/main.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ func main() {
4646
if err := wr.Start(); err != nil {
4747
log.Fatal(err)
4848
}
49+
time.Sleep(time.Minute)
4950
}()
5051

52+
time.Sleep(time.Second * 10)
53+
5154
daprClient, err := client.NewClient()
5255
defer daprClient.Close()
5356
if err != nil {
@@ -118,7 +121,7 @@ func main() {
118121

119122
fmt.Println("workflow resumed")
120123

121-
fmt.Printf("stage: %d", stage)
124+
fmt.Printf("stage: %d\n", stage)
122125

123126
// Raise Event Test
124127

@@ -226,8 +229,6 @@ func main() {
226229
if err := wr.Shutdown(); err != nil {
227230
log.Fatalf("failed to shutdown runtime: %v", err)
228231
}
229-
230-
time.Sleep(time.Second * 5)
231232
}
232233

233234
func TestWorkflow(ctx *workflow.Context) (any, error) {

workflow/runtime.go

+12-33
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,25 @@ package workflow
22

33
import (
44
"context"
5-
"crypto/tls"
65
"errors"
76
"fmt"
8-
"github.com/dapr/go-sdk/workflow/internal"
9-
"google.golang.org/grpc/credentials"
7+
dapr "github.com/dapr/go-sdk/client"
8+
"github.com/microsoft/durabletask-go/backend"
9+
"github.com/microsoft/durabletask-go/client"
10+
"github.com/microsoft/durabletask-go/task"
1011
"log"
1112
"reflect"
1213
"runtime"
1314
"strings"
1415
"sync"
15-
"time"
16-
17-
"github.com/microsoft/durabletask-go/backend"
18-
"github.com/microsoft/durabletask-go/client"
19-
"github.com/microsoft/durabletask-go/task"
20-
"google.golang.org/grpc"
21-
"google.golang.org/grpc/credentials/insecure"
2216
)
2317

2418
type WorkflowRuntime struct {
2519
tasks *task.TaskRegistry
2620
client *client.TaskHubGrpcClient
2721

28-
mutex sync.Mutex // TODO: implement
29-
quit chan bool
30-
cancel context.CancelFunc
22+
mutex sync.Mutex // TODO: implement
23+
quit chan bool
3124
}
3225

3326
type Workflow func(ctx *Context) (any, error)
@@ -39,28 +32,17 @@ func NewRuntime(address string) (*WorkflowRuntime, error) {
3932
return &WorkflowRuntime{}, errors.New("no address provided")
4033
}
4134

42-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) // TODO: add timeout option
35+
ctx, cancel := context.WithCancel(context.Background())
36+
defer cancel()
4337

44-
connectionOptions := []grpc.DialOption{
45-
grpc.WithUserAgent(internal.UserAgent()),
46-
grpc.WithBlock(),
47-
}
38+
daprClient, err := dapr.NewClientWithAddressContext(ctx, address)
4839

49-
parsedAddress, err := internal.ParseGRPCEndpoint(address)
50-
51-
if parsedAddress.TLS {
52-
connectionOptions = append(connectionOptions, grpc.WithTransportCredentials(credentials.NewTLS(new(tls.Config))))
53-
} else {
54-
connectionOptions = append(connectionOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
40+
if err != nil {
41+
return &WorkflowRuntime{}, fmt.Errorf("failed to create runtime - dapr client failed: %v", err)
5542
}
5643

57-
clientConn, err := grpc.DialContext(
58-
ctx,
59-
parsedAddress.Target,
60-
connectionOptions...,
61-
)
44+
clientConn := daprClient.GrpcClientConn()
6245

63-
cancel()
6446
if err != nil {
6547
return &WorkflowRuntime{}, fmt.Errorf("failed to create runtime - grpc connection failed: %v", err)
6648
}
@@ -69,7 +51,6 @@ func NewRuntime(address string) (*WorkflowRuntime, error) {
6951
tasks: task.NewTaskRegistry(),
7052
client: client.NewTaskHubGrpcClient(clientConn, backend.DefaultLogger()),
7153
quit: make(chan bool),
72-
cancel: cancel,
7354
}, nil
7455
}
7556

@@ -145,8 +126,6 @@ func (wr *WorkflowRuntime) Start() error {
145126
}
146127

147128
func (wr *WorkflowRuntime) Shutdown() error {
148-
// cancel grpc context
149-
wr.cancel()
150129
// send close signal
151130
wr.quit <- true
152131
log.Println("work item listener shutdown signal sent")

workflow/runtime_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
func TestNewRuntime(t *testing.T) {
1414
t.Run("failure to create newruntime without dapr", func(t *testing.T) {
15-
wr, err := NewRuntime("localhost", "50001")
15+
wr, err := NewRuntime(":50001")
1616
require.Error(t, err)
1717
assert.Equal(t, &WorkflowRuntime{}, wr)
1818
})
@@ -24,7 +24,6 @@ func TestWorkflowRuntime(t *testing.T) {
2424
client: nil,
2525
mutex: sync.Mutex{},
2626
quit: nil,
27-
cancel: nil,
2827
}
2928

3029
// TODO: Mock grpc conn - currently requires dapr to be available

0 commit comments

Comments
 (0)