Skip to content

Commit

Permalink
fix: migrate to dapr builtin client
Browse files Browse the repository at this point in the history
Signed-off-by: mikeee <[email protected]>
  • Loading branch information
mikeee committed Jan 8, 2024
1 parent 81eab90 commit b8c0b1a
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 36 deletions.
7 changes: 0 additions & 7 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,4 @@ jobs:
- name: Check Examples
run: |
cd examples
./validate.sh actor
./validate.sh configuration
./validate.sh grpc-service
./validate.sh hello-world
./validate.sh pubsub
./validate.sh service
./validate.sh socket
./validate.sh workflow
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ type Client interface {

// GrpcClient returns the base grpc client if grpc is used and nil otherwise
GrpcClient() pb.DaprClient

GrpcClientConn() *grpc.ClientConn
}

// NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port.
Expand Down
6 changes: 4 additions & 2 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ expected_stdout_lines:
- '== APP == Runtime initialized'
- '== APP == TestWorkflow registered'
- '== APP == TestActivity registered'
- '== APP == runner 1'
- '== APP == runner started'
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow paused'
- '== APP == workflow resumed'
- '== APP == stage: 1'
- '== APP == workflow event raised'
- '== APP == stage: 2'
- '== APP == workflow status: COMPLETED'
Expand Down Expand Up @@ -51,10 +52,11 @@ dapr run --app-id workflow-sequential \
- '== APP == Runtime initialized'
- '== APP == TestWorkflow registered'
- '== APP == TestActivity registered'
- '== APP == runner 1'
- '== APP == runner started'
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow paused'
- '== APP == workflow resumed'
- '== APP == stage: 1'
- '== APP == workflow event raised'
- '== APP == stage: 2'
- '== APP == workflow status: COMPLETED'
Expand Down
17 changes: 11 additions & 6 deletions examples/workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package main
import (
"context"
"fmt"
"google.golang.org/grpc/metadata"
"log"
"sync"
"time"

"google.golang.org/grpc/metadata"

"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
)
Expand All @@ -19,12 +20,12 @@ const (
)

func main() {
wr, err := workflow.NewRuntime("localhost", "50001")
wr, err := workflow.NewRuntime("dns:127.0.0.1:50001")
if err != nil {
log.Fatal(err)
}

fmt.Println("Runtime initialized")
fmt.Println("WorkflowRuntime initialized")

if err := wr.RegisterWorkflow(TestWorkflow); err != nil {
log.Fatal(err)
Expand All @@ -38,8 +39,8 @@ func main() {

var wg sync.WaitGroup

// start workflow runner
fmt.Println("runner 1")
// Start workflow runner
fmt.Println("runner started")
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -48,6 +49,8 @@ func main() {
}
}()

time.Sleep(time.Second * 5)

daprClient, err := client.NewClient()
defer daprClient.Close()
if err != nil {
Expand Down Expand Up @@ -116,7 +119,9 @@ func main() {
log.Fatalf("workflow not running")
}

fmt.Printf("workflow resumed\n")
fmt.Println("workflow resumed")

fmt.Printf("stage: %d\n", stage)

// Raise Event Test

Expand Down
33 changes: 15 additions & 18 deletions workflow/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,40 @@ import (
"runtime"
"strings"
"sync"
"time"

"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/client"
"github.com/microsoft/durabletask-go/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

dapr "github.com/dapr/go-sdk/client"
)

type WorkflowRuntime struct {
tasks *task.TaskRegistry
client *client.TaskHubGrpcClient

mutex sync.Mutex // TODO: implement
quit chan bool
cancel context.CancelFunc
mutex sync.Mutex // TODO: implement
quit chan bool
}

type Workflow func(ctx *Context) (any, error)

type Activity func(ctx ActivityContext) (any, error)

func NewRuntime(host string, port string) (*WorkflowRuntime, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) // TODO: add timeout option
func NewRuntime(address string) (*WorkflowRuntime, error) {
if address == "" {
return &WorkflowRuntime{}, errors.New("no address provided")
}

Check warning on line 35 in workflow/runtime.go

View check run for this annotation

Codecov / codecov/patch

workflow/runtime.go#L34-L35

Added lines #L34 - L35 were not covered by tests

ctx, cancel := context.WithCancel(context.Background()) // TODO: add timeout option
defer cancel()

address := fmt.Sprintf("%s:%s", host, port)
daprClient, err := dapr.NewClientWithAddressContext(ctx, address)
if err != nil {
return nil, err
}

clientConn, err := grpc.DialContext(
ctx,
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), // TODO: config
)
clientConn := daprClient.GrpcClientConn()
if err != nil {
return &WorkflowRuntime{}, fmt.Errorf("failed to create runtime - grpc connection failed: %v", err)
}

Check warning on line 48 in workflow/runtime.go

View check run for this annotation

Codecov / codecov/patch

workflow/runtime.go#L45-L48

Added lines #L45 - L48 were not covered by tests
Expand All @@ -51,7 +51,6 @@ func NewRuntime(host string, port string) (*WorkflowRuntime, error) {
tasks: task.NewTaskRegistry(),
client: client.NewTaskHubGrpcClient(clientConn, backend.DefaultLogger()),
quit: make(chan bool),
cancel: cancel,
}, nil

Check warning on line 54 in workflow/runtime.go

View check run for this annotation

Codecov / codecov/patch

workflow/runtime.go#L50-L54

Added lines #L50 - L54 were not covered by tests
}

Expand Down Expand Up @@ -127,8 +126,6 @@ func (wr *WorkflowRuntime) Start() error {
}

func (wr *WorkflowRuntime) Shutdown() error {
// cancel grpc context
wr.cancel()
// send close signal
wr.quit <- true
log.Println("work item listener shutdown signal sent")
Expand Down
5 changes: 2 additions & 3 deletions workflow/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

func TestNewRuntime(t *testing.T) {
t.Run("failure to create newruntime without dapr", func(t *testing.T) {
wr, err := NewRuntime("localhost", "50001")
wr, err := NewRuntime(":50001")
require.Error(t, err)
assert.Equal(t, &WorkflowRuntime{}, wr)
assert.Empty(t, wr)
})
}

Expand All @@ -24,7 +24,6 @@ func TestWorkflowRuntime(t *testing.T) {
client: nil,
mutex: sync.Mutex{},
quit: nil,
cancel: nil,
}

// TODO: Mock grpc conn - currently requires dapr to be available
Expand Down

0 comments on commit b8c0b1a

Please sign in to comment.