Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update streaming subscription to understand new initial response #601

Merged
merged 4 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh
DAPR_CLI_REF: 19b9de05611ade540b06d2c061f32f6c37093a17
DAPR_CLI_REF: e72f95393cc0c66b0cc4d726f5c45f80b916b400
JoshVanL marked this conversation as resolved.
Show resolved Hide resolved
DAPR_REF: ${{ github.event.inputs.daprdapr_commit }}
CHECKOUT_REPO: ${{ github.repository }}
CHECKOUT_REF: ${{ github.ref }}
outputs:
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
JoshVanL marked this conversation as resolved.
Show resolved Hide resolved
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_RUNTIME_VER: 1.14.0-rc.4
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
Expand Down Expand Up @@ -150,7 +150,7 @@ jobs:
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_RUNTIME_VER: 1.14.0-rc.4
JoshVanL marked this conversation as resolved.
Show resolved Hide resolved
DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }}
DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }}
CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }}
Expand All @@ -169,7 +169,7 @@ jobs:
"grpc-service",
"hello-world",
"pubsub",
"bidipubsub",
"streamsub",
"service",
"socket",
"workflow",
Expand Down
26 changes: 20 additions & 6 deletions client/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
return nil, err
}

return &Subscription{
s := &Subscription{

Check warning on line 57 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L57

Added line #L57 was not covered by tests
stream: stream,
}, nil
}

return s, nil

Check warning on line 61 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L61

Added line #L61 was not covered by tests
}

func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
Expand Down Expand Up @@ -99,10 +101,11 @@
}

func (s *Subscription) Receive() (*SubscriptionMessage, error) {
event, err := s.stream.Recv()
resp, err := s.stream.Recv()

Check warning on line 104 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L104

Added line #L104 was not covered by tests
if err != nil {
return nil, err
}
event := resp.GetEventMessage()

Check warning on line 108 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L108

Added line #L108 was not covered by tests

data := any(event.GetData())
if len(event.GetData()) > 0 {
Expand Down Expand Up @@ -181,8 +184,8 @@
defer s.sub.lock.Unlock()

return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{
EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{
EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{

Check warning on line 188 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L187-L188

Added lines #L187 - L188 were not covered by tests
Id: s.ID,
Status: &pb.TopicEventResponse{Status: status},
},
Expand All @@ -206,7 +209,7 @@

err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{
InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{

Check warning on line 212 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L212

Added line #L212 was not covered by tests
PubsubName: opts.PubsubName, Topic: opts.Topic,
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
},
Expand All @@ -216,5 +219,16 @@
return nil, errors.Join(err, stream.CloseSend())
}

resp, err := stream.Recv()
if err != nil {
return nil, errors.Join(err, stream.CloseSend())

Check warning on line 224 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L222-L224

Added lines #L222 - L224 were not covered by tests
}

switch resp.GetSubscribeTopicEventsResponseType().(type) {
case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse:
default:
return nil, fmt.Errorf("unexpected initial response from server : %v", resp)

Check warning on line 230 in client/subscribe.go

View check run for this annotation

Codecov / codecov/patch

client/subscribe.go#L227-L230

Added lines #L227 - L230 were not covered by tests
}

return stream, nil
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dapr/dapr v1.14.0-rc.2 // indirect
github.com/dapr/dapr v1.14.0-rc.5 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY=
github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dapr run --app-id sub \
--dapr-http-port 3500 \
--log-level debug \
--resources-path ./config \
go run bidisub/bidisub.go
go run sub/sub.go
```

<!-- END_STEP -->
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,27 @@ func main() {
eventHandler,
)
if err != nil {
log.Fatal(err)
log.Fatalf("failed to subscribe to topic: %v", err)
}
fmt.Printf(">>Created subscription messages/sendorder\n")

// Another method of streaming subscriptions, this time for the topic "neworder".
// The returned `sub` object is used to receive messages.
// `sub` must be closed once it's no longer needed.

sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{
PubsubName: "messages",
Topic: "neworder",
DeadLetterTopic: &deadLetterTopic,
})
if err != nil {
log.Fatal(err)
log.Fatalf("failed to subscribe to topic: %v", err)
}
fmt.Printf(">>Created subscription\n")
fmt.Printf(">>Created subscription messages/neworder\n")

for i := 0; i < 3; i++ {
msg, err := sub.Receive()
if err != nil {
log.Fatalf("error receiving message: %v", err)
log.Fatalf("Error receiving message: %v", err)
}
log.Printf(">>Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dapr/go-sdk
go 1.22.5

require (
github.com/dapr/dapr v1.14.0-rc.2
github.com/dapr/dapr v1.14.0-rc.5
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY=
github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
Expand Down