From 5c13a2c7da4af178f8332dc5fc3af3fdb42f6fbc Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 23 Jul 2024 12:00:48 +0100 Subject: [PATCH 1/4] Update streaming subscription to understand new initial response Signed-off-by: joshvanl --- .github/workflows/validate_examples.yaml | 4 +-- client/subscribe.go | 26 ++++++++++++++----- examples/go.mod | 2 +- examples/go.sum | 4 +-- examples/{bidipubsub => streampub}/README.md | 2 +- .../config/pubsub.yaml | 0 examples/{bidipubsub => streampub}/pub/pub.go | 0 .../bidisub.go => streampub/sub/sub.go} | 10 +++---- go.mod | 2 +- go.sum | 4 +-- 10 files changed, 34 insertions(+), 20 deletions(-) rename examples/{bidipubsub => streampub}/README.md (97%) rename examples/{bidipubsub => streampub}/config/pubsub.yaml (100%) rename examples/{bidipubsub => streampub}/pub/pub.go (100%) rename examples/{bidipubsub/bidisub/bidisub.go => streampub/sub/sub.go} (90%) diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index a39d9542..f077e3b0 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -40,7 +40,7 @@ jobs: DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }} DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }} DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }} - DAPR_RUNTIME_VER: 1.14.0-rc.2 + DAPR_RUNTIME_VER: 1.14.0-rc.4 CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }} CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }} DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }} @@ -150,7 +150,7 @@ jobs: GOPROXY: https://proxy.golang.org DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }} DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }} - DAPR_RUNTIME_VER: 1.14.0-rc.2 + DAPR_RUNTIME_VER: 1.14.0-rc.4 DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }} DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }} CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }} diff --git a/client/subscribe.go b/client/subscribe.go index 31931c9d..e59da80b 100644 --- a/client/subscribe.go +++ b/client/subscribe.go @@ -54,9 +54,11 @@ func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (* return nil, err } - return &Subscription{ + s := &Subscription{ stream: stream, - }, nil + } + + return s, nil } func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) { @@ -99,10 +101,11 @@ func (s *Subscription) Close() error { } func (s *Subscription) Receive() (*SubscriptionMessage, error) { - event, err := s.stream.Recv() + resp, err := s.stream.Recv() if err != nil { return nil, err } + event := resp.GetEventMessage() data := any(event.GetData()) if len(event.GetData()) > 0 { @@ -181,8 +184,8 @@ func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventRes defer s.sub.lock.Unlock() return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{ - SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{ - EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{ + SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{ + EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{ Id: s.ID, Status: &pb.TopicEventResponse{Status: status}, }, @@ -206,7 +209,7 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{ SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{ - InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{ + InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{ PubsubName: opts.PubsubName, Topic: opts.Topic, Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic, }, @@ -216,5 +219,16 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript return nil, errors.Join(err, stream.CloseSend()) } + resp, err := stream.Recv() + if err != nil { + return nil, errors.Join(err, stream.CloseSend()) + } + + switch resp.GetSubscribeTopicEventsResponseType().(type) { + case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse: + default: + return nil, fmt.Errorf("unexpected initial response from server : %v", resp) + } + return stream, nil } diff --git a/examples/go.mod b/examples/go.mod index df4eb308..ca5a44a7 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -18,7 +18,7 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dapr/dapr v1.14.0-rc.2 // indirect + github.com/dapr/dapr v1.14.0-rc.5 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-chi/chi/v5 v5.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/examples/go.sum b/examples/go.sum index e2b13bfa..0e250803 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -7,8 +7,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8= -github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= +github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY= +github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/examples/bidipubsub/README.md b/examples/streampub/README.md similarity index 97% rename from examples/bidipubsub/README.md rename to examples/streampub/README.md index 2bde2b7e..5555bd82 100644 --- a/examples/bidipubsub/README.md +++ b/examples/streampub/README.md @@ -35,7 +35,7 @@ dapr run --app-id sub \ --dapr-http-port 3500 \ --log-level debug \ --resources-path ./config \ - go run bidisub/bidisub.go + go run sub/sub.go ``` diff --git a/examples/bidipubsub/config/pubsub.yaml b/examples/streampub/config/pubsub.yaml similarity index 100% rename from examples/bidipubsub/config/pubsub.yaml rename to examples/streampub/config/pubsub.yaml diff --git a/examples/bidipubsub/pub/pub.go b/examples/streampub/pub/pub.go similarity index 100% rename from examples/bidipubsub/pub/pub.go rename to examples/streampub/pub/pub.go diff --git a/examples/bidipubsub/bidisub/bidisub.go b/examples/streampub/sub/sub.go similarity index 90% rename from examples/bidipubsub/bidisub/bidisub.go rename to examples/streampub/sub/sub.go index 8d51db3e..92211b3c 100644 --- a/examples/bidipubsub/bidisub/bidisub.go +++ b/examples/streampub/sub/sub.go @@ -45,27 +45,27 @@ func main() { eventHandler, ) if err != nil { - log.Fatal(err) + log.Fatalf("failed to subscribe to topic: %v", err) } + fmt.Printf(">>Created subscription messages/sendorder\n") // Another method of streaming subscriptions, this time for the topic "neworder". // The returned `sub` object is used to receive messages. // `sub` must be closed once it's no longer needed. - sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{ PubsubName: "messages", Topic: "neworder", DeadLetterTopic: &deadLetterTopic, }) if err != nil { - log.Fatal(err) + log.Fatalf("failed to subscribe to topic: %v", err) } - fmt.Printf(">>Created subscription\n") + fmt.Printf(">>Created subscription messages/neworder\n") for i := 0; i < 3; i++ { msg, err := sub.Receive() if err != nil { - log.Fatalf("error receiving message: %v", err) + log.Fatalf("Error receiving message: %v", err) } log.Printf(">>Received message\n") log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData) diff --git a/go.mod b/go.mod index 3817e3b2..0530f1a0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/dapr/go-sdk go 1.22.5 require ( - github.com/dapr/dapr v1.14.0-rc.2 + github.com/dapr/dapr v1.14.0-rc.5 github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index cd8f26cc..95e128f4 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8= -github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= +github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY= +github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= From e85d5875910df55ed644d45ef886d407054c9979 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 23 Jul 2024 12:11:37 +0100 Subject: [PATCH 2/4] Update dapr CLI to 1.14.0-rc.6 Signed-off-by: joshvanl --- .github/workflows/validate_examples.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index f077e3b0..807ffe18 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -32,7 +32,7 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh - DAPR_CLI_REF: 19b9de05611ade540b06d2c061f32f6c37093a17 + DAPR_CLI_REF: e72f95393cc0c66b0cc4d726f5c45f80b916b400 DAPR_REF: ${{ github.event.inputs.daprdapr_commit }} CHECKOUT_REPO: ${{ github.repository }} CHECKOUT_REF: ${{ github.ref }} From 305dd16266fd2b44a543c925e20365b61ec6f565 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 23 Jul 2024 12:14:19 +0100 Subject: [PATCH 3/4] Update streamsub name in validate examples Signed-off-by: joshvanl --- .github/workflows/validate_examples.yaml | 2 +- examples/{streampub => streamsub}/README.md | 0 examples/{streampub => streamsub}/config/pubsub.yaml | 0 examples/{streampub => streamsub}/pub/pub.go | 0 examples/{streampub => streamsub}/sub/sub.go | 0 5 files changed, 1 insertion(+), 1 deletion(-) rename examples/{streampub => streamsub}/README.md (100%) rename examples/{streampub => streamsub}/config/pubsub.yaml (100%) rename examples/{streampub => streamsub}/pub/pub.go (100%) rename examples/{streampub => streamsub}/sub/sub.go (100%) diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index 807ffe18..4e41d3b3 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -169,7 +169,7 @@ jobs: "grpc-service", "hello-world", "pubsub", - "bidipubsub", + "streamsub", "service", "socket", "workflow", diff --git a/examples/streampub/README.md b/examples/streamsub/README.md similarity index 100% rename from examples/streampub/README.md rename to examples/streamsub/README.md diff --git a/examples/streampub/config/pubsub.yaml b/examples/streamsub/config/pubsub.yaml similarity index 100% rename from examples/streampub/config/pubsub.yaml rename to examples/streamsub/config/pubsub.yaml diff --git a/examples/streampub/pub/pub.go b/examples/streamsub/pub/pub.go similarity index 100% rename from examples/streampub/pub/pub.go rename to examples/streamsub/pub/pub.go diff --git a/examples/streampub/sub/sub.go b/examples/streamsub/sub/sub.go similarity index 100% rename from examples/streampub/sub/sub.go rename to examples/streamsub/sub/sub.go From 16978909ad145e7cd72c972f2ba562b62dd57734 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Tue, 23 Jul 2024 14:57:31 +0100 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Mike Nguyen Signed-off-by: Josh van Leeuwen --- .github/workflows/validate_examples.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index 4e41d3b3..de1563d8 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -32,13 +32,13 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh - DAPR_CLI_REF: e72f95393cc0c66b0cc4d726f5c45f80b916b400 + DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }} DAPR_REF: ${{ github.event.inputs.daprdapr_commit }} CHECKOUT_REPO: ${{ github.repository }} CHECKOUT_REF: ${{ github.ref }} outputs: DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }} - DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }} + DAPR_CLI_VER: 1.14.0-rc.6 DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }} DAPR_RUNTIME_VER: 1.14.0-rc.4 CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }} @@ -150,7 +150,7 @@ jobs: GOPROXY: https://proxy.golang.org DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }} DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }} - DAPR_RUNTIME_VER: 1.14.0-rc.4 + DAPR_RUNTIME_VER: ${{ needs.setup.outputs.DAPR_RUNTIME_VER }} DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }} DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }} CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }}