From b7b90e3f8eb2ad20c3732f7aede0a45f24f9a265 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Thu, 11 Jul 2024 15:43:51 +0100 Subject: [PATCH] Bidirectional Subscriptions (#578) * Bidirectional Subscriptions Adds support for bidirectional subscriptions to PubSubs. Adds two methods for subscribing- one using a callback and one using an imperative approach. Both giving support to different programming styles or use cases. Adds example with tests. Signed-off-by: joshvanl * Linting: Remove unused `closeCh` Signed-off-by: joshvanl * Fixes comment order in bidisub.go Signed-off-by: joshvanl * Add comment about processing message Signed-off-by: joshvanl * Adds dead letter topic example Signed-off-by: joshvanl * chore: remove go.mod Signed-off-by: mikeee * Updates go mod to v1.14.0-rc.1 Signed-off-by: joshvanl --------- Signed-off-by: joshvanl Signed-off-by: mikeee Signed-off-by: Mike Nguyen Co-authored-by: mikeee --- .github/workflows/validate_examples.yaml | 1 + client/client.go | 8 + client/state.go | 3 +- client/subscribe.go | 220 +++++++++++++++++++++++ examples/bidipubsub/README.md | 71 ++++++++ examples/bidipubsub/bidisub/bidisub.go | 92 ++++++++++ examples/bidipubsub/config/pubsub.yaml | 12 ++ examples/bidipubsub/pub/pub.go | 60 +++++++ go.mod | 24 +-- go.sum | 44 ++--- service/common/type.go | 10 +- service/http/topic.go | 2 +- 12 files changed, 508 insertions(+), 39 deletions(-) create mode 100644 client/subscribe.go create mode 100644 examples/bidipubsub/README.md create mode 100644 examples/bidipubsub/bidisub/bidisub.go create mode 100644 examples/bidipubsub/config/pubsub.yaml create mode 100644 examples/bidipubsub/pub/pub.go diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index 3c5537c6..64c5709e 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -162,6 +162,7 @@ jobs: "grpc-service", "hello-world", "pubsub", + "bidipubsub", "service", "socket", "workflow", diff --git a/client/client.go b/client/client.go index 762bb4b1..98a835fa 100644 --- a/client/client.go +++ b/client/client.go @@ -162,6 +162,14 @@ type Client interface { // UnsubscribeConfigurationItems can stop the subscription with target store's and id UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error + // Subscribe subscribes to a pubsub topic and streams messages to the returned Subscription. + // Subscription must be closed after finishing with subscribing. + Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) + + // SubscribeWithHandler subscribes to a pubsub topic and calls the given handler on topic events. + // The returned cancel function must be called after finishing with subscribing. + SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) + // DeleteBulkState deletes content for multiple keys from store. DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error diff --git a/client/state.go b/client/state.go index 8b8626a3..e2c70629 100644 --- a/client/state.go +++ b/client/state.go @@ -382,7 +382,8 @@ func (c *GRPCClient) GetBulkState(ctx context.Context, storeName string, keys [] // GetState retrieves state from specific store using default consistency option. func (c *GRPCClient) GetState(ctx context.Context, storeName, key string, meta map[string]string) (item *StateItem, err error) { - return c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong) + i, err := c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong) + return i, err } // GetStateWithConsistency retrieves state from specific store using provided state consistency. diff --git a/client/subscribe.go b/client/subscribe.go new file mode 100644 index 00000000..31931c9d --- /dev/null +++ b/client/subscribe.go @@ -0,0 +1,220 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "mime" + "strings" + "sync" + "sync/atomic" + + pb "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/go-sdk/service/common" +) + +type SubscriptionHandleFunction func(event *common.TopicEvent) common.SubscriptionResponseStatus + +type SubscriptionOptions struct { + PubsubName string + Topic string + DeadLetterTopic *string + Metadata map[string]string +} + +type Subscription struct { + stream pb.Dapr_SubscribeTopicEventsAlpha1Client + // lock locks concurrent writes to subscription stream. + lock sync.Mutex + closed atomic.Bool +} + +type SubscriptionMessage struct { + *common.TopicEvent + sub *Subscription +} + +func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) { + stream, err := c.subscribeInitialRequest(ctx, opts) + if err != nil { + return nil, err + } + + return &Subscription{ + stream: stream, + }, nil +} + +func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) { + s, err := c.Subscribe(ctx, opts) + if err != nil { + return nil, err + } + + go func() { + defer s.Close() + + for { + msg, err := s.Receive() + if err != nil { + if !s.closed.Load() { + logger.Printf("Error receiving messages from subscription pubsub=%s topic=%s, closing subscription: %s", + opts.PubsubName, opts.Topic, err) + } + return + } + + go func() { + if err := msg.respondStatus(handler(msg.TopicEvent)); err != nil { + logger.Printf("Error responding to topic with event status pubsub=%s topic=%s message_id=%s: %s", + opts.PubsubName, opts.Topic, msg.ID, err) + } + }() + } + }() + + return s.Close, nil +} + +func (s *Subscription) Close() error { + if !s.closed.CompareAndSwap(false, true) { + return errors.New("subscription already closed") + } + + return s.stream.CloseSend() +} + +func (s *Subscription) Receive() (*SubscriptionMessage, error) { + event, err := s.stream.Recv() + if err != nil { + return nil, err + } + + data := any(event.GetData()) + if len(event.GetData()) > 0 { + mediaType, _, err := mime.ParseMediaType(event.GetDataContentType()) + if err == nil { + var v interface{} + switch mediaType { + case "application/json": + if err := json.Unmarshal(event.GetData(), &v); err == nil { + data = v + } + case "text/plain": + // Assume UTF-8 encoded string. + data = string(event.GetData()) + default: + if strings.HasPrefix(mediaType, "application/") && + strings.HasSuffix(mediaType, "+json") { + if err := json.Unmarshal(event.GetData(), &v); err == nil { + data = v + } + } + } + } + } + + topicEvent := &common.TopicEvent{ + ID: event.GetId(), + Source: event.GetSource(), + Type: event.GetType(), + SpecVersion: event.GetSpecVersion(), + DataContentType: event.GetDataContentType(), + Data: data, + RawData: event.GetData(), + Topic: event.GetTopic(), + PubsubName: event.GetPubsubName(), + } + + return &SubscriptionMessage{ + sub: s, + TopicEvent: topicEvent, + }, nil +} + +func (s *SubscriptionMessage) Success() error { + return s.respond(pb.TopicEventResponse_SUCCESS) +} + +func (s *SubscriptionMessage) Retry() error { + return s.respond(pb.TopicEventResponse_RETRY) +} + +func (s *SubscriptionMessage) Drop() error { + return s.respond(pb.TopicEventResponse_DROP) +} + +func (s *SubscriptionMessage) respondStatus(status common.SubscriptionResponseStatus) error { + var statuspb pb.TopicEventResponse_TopicEventResponseStatus + switch status { + case common.SubscriptionResponseStatusSuccess: + statuspb = pb.TopicEventResponse_SUCCESS + case common.SubscriptionResponseStatusRetry: + statuspb = pb.TopicEventResponse_RETRY + case common.SubscriptionResponseStatusDrop: + statuspb = pb.TopicEventResponse_DROP + default: + return fmt.Errorf("unknown status, expected one of %s, %s, %s: %s", + common.SubscriptionResponseStatusSuccess, common.SubscriptionResponseStatusRetry, + common.SubscriptionResponseStatusDrop, status) + } + + return s.respond(statuspb) +} + +func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventResponseStatus) error { + s.sub.lock.Lock() + defer s.sub.lock.Unlock() + + return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{ + SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{ + EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{ + Id: s.ID, + Status: &pb.TopicEventResponse{Status: status}, + }, + }, + }) +} + +func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) { + if len(opts.PubsubName) == 0 { + return nil, errors.New("pubsub name required") + } + + if len(opts.Topic) == 0 { + return nil, errors.New("topic required") + } + + stream, err := c.protoClient.SubscribeTopicEventsAlpha1(ctx) + if err != nil { + return nil, err + } + + err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{ + SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{ + InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{ + PubsubName: opts.PubsubName, Topic: opts.Topic, + Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic, + }, + }, + }) + if err != nil { + return nil, errors.Join(err, stream.CloseSend()) + } + + return stream, nil +} diff --git a/examples/bidipubsub/README.md b/examples/bidipubsub/README.md new file mode 100644 index 00000000..2bde2b7e --- /dev/null +++ b/examples/bidipubsub/README.md @@ -0,0 +1,71 @@ +# Dapr PubSub Example with go-sdk + +This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API. + +## Diagram + +![](https://i.loli.net/2020/08/23/5MBYgwqCZcXNUf2.jpg) + +## Step + +### Prepare + +- Dapr installed + +### Run Subscriber Server + + + +```bash +dapr run --app-id sub \ + --dapr-http-port 3500 \ + --log-level debug \ + --resources-path ./config \ + go run bidisub/bidisub.go +``` + + + +### Run Publisher + + + +```bash +dapr run --app-id pub \ + --log-level debug \ + --resources-path ./config \ + go run pub/pub.go +``` + + + +## Result + +```shell +== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping123 +``` diff --git a/examples/bidipubsub/bidisub/bidisub.go b/examples/bidipubsub/bidisub/bidisub.go new file mode 100644 index 00000000..34d69114 --- /dev/null +++ b/examples/bidipubsub/bidisub/bidisub.go @@ -0,0 +1,92 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + daprd "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/service/common" +) + +func main() { + client, err := daprd.NewClient() + if err != nil { + log.Fatal(err) + } + + var deadLetterTopic = "deadletter" + + // Streaming subscription for topic "sendorder" on pubsub component + // "messages". The given subscription handler is called when a message is + // received. The returned `stop` function is used to stop the subscription + // and close the connection. + stop, err := client.SubscribeWithHandler(context.Background(), + daprd.SubscriptionOptions{ + PubsubName: "messages", + Topic: "sendorder", + DeadLetterTopic: &deadLetterTopic, + }, + eventHandler, + ) + if err != nil { + log.Fatal(err) + } + + // Another method of streaming subscriptions, this time for the topic "neworder". + // The returned `sub` object is used to receive messages. + // `sub` must be closed once it's no longer needed. + + sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{ + PubsubName: "pubsub", + Topic: "neworder", + DeadLetterTopic: &deadLetterTopic, + }) + if err != nil { + log.Fatal(err) + } + fmt.Printf(">>Created subscription\n") + + for i := 0; i < 3; i++ { + msg, err := sub.Receive() + if err != nil { + log.Fatalf("error receiving message: %v", err) + } + log.Printf(">>Received message\n") + log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData) + + // Use _MUST_ always signal the result of processing the message, else the + // message will not be considered as processed and will be redelivered or + // dead lettered. + if err := msg.Success(); err != nil { + log.Fatalf("error sending message success: %v", err) + } + } + + time.Sleep(time.Second * 5) + + if err := errors.Join(stop(), sub.Close()); err != nil { + log.Fatal(err) + } +} + +func eventHandler(e *common.TopicEvent) common.SubscriptionResponseStatus { + log.Printf(">>Received message\n") + log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", e.PubsubName, e.Topic, e.ID, e.Data) + return common.SubscriptionResponseStatusSuccess +} diff --git a/examples/bidipubsub/config/pubsub.yaml b/examples/bidipubsub/config/pubsub.yaml new file mode 100644 index 00000000..54b33387 --- /dev/null +++ b/examples/bidipubsub/config/pubsub.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messages +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/examples/bidipubsub/pub/pub.go b/examples/bidipubsub/pub/pub.go new file mode 100644 index 00000000..6f13afdb --- /dev/null +++ b/examples/bidipubsub/pub/pub.go @@ -0,0 +1,60 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "log" + + dapr "github.com/dapr/go-sdk/client" +) + +var ( + // set the environment as instructions. + pubsubName = "messages" + topicName1 = "sendorder" + topicName2 = "neworder" +) + +func main() { + ctx := context.Background() + publishEventData := []byte("ping123") + publishEventsData := []interface{}{"multi-ping", "multi-pong"} + + client, err := dapr.NewClient() + if err != nil { + log.Fatalf("error creating dapr client: %v", err) + } + defer client.Close() + + // Publish a single event + log.Println("sending message") + if err := client.PublishEvent(ctx, pubsubName, topicName1, publishEventData); err != nil { + log.Fatalf("error publishing event: %v", err) + } + if err := client.PublishEvent(ctx, pubsubName, topicName2, publishEventData); err != nil { + log.Fatalf("error publishing event: %v", err) + } + log.Println("message published") + + // Publish multiple events + log.Println("sending multiple messages") + if res := client.PublishEvents(ctx, pubsubName, topicName1, publishEventsData); res.Error != nil { + log.Fatalf("error publishing events: %v", res.Error) + } + if res := client.PublishEvents(ctx, pubsubName, topicName2, publishEventsData); res.Error != nil { + log.Fatalf("error publishing events: %v", res.Error) + } + log.Println("multiple messages published") +} diff --git a/go.mod b/go.mod index 5ab91420..e97295c7 100644 --- a/go.mod +++ b/go.mod @@ -1,34 +1,36 @@ module github.com/dapr/go-sdk -go 1.21.8 +go 1.22.4 + +toolchain go1.22.5 require ( - github.com/dapr/dapr v1.13.4 + github.com/dapr/dapr v1.14.0-rc.1 github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 - github.com/microsoft/durabletask-go v0.4.1-0.20240621011625-bfcc3331ca58 - github.com/stretchr/testify v1.8.4 + github.com/microsoft/durabletask-go v0.5.0 + github.com/stretchr/testify v1.9.0 google.golang.org/grpc v1.65.0 - google.golang.org/protobuf v1.34.1 + google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/kr/text v0.2.0 // indirect github.com/marusama/semaphore/v2 v2.5.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - go.opentelemetry.io/otel v1.23.1 // indirect - go.opentelemetry.io/otel/metric v1.23.1 // indirect - go.opentelemetry.io/otel/trace v1.23.1 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index 40d4d7c3..d13ce858 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,15 @@ -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/dapr/dapr v1.13.4 h1:cwWA8qx7ALbx8YLOSI0ZJ6dSqxGSeHkqU5f/CVcrfvE= -github.com/dapr/dapr v1.13.4/go.mod h1:v7xjV+3dP8zKaSlvUJRKoPsBby2CosobCBTZzHbahcs= +github.com/dapr/dapr v1.14.0-rc.1 h1:4P376+PIU66hMtLz5TiF41IJ6Lh5FNY1DiwaNNYZv/8= +github.com/dapr/dapr v1.14.0-rc.1/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= @@ -28,23 +28,23 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.4.1-0.20240621011625-bfcc3331ca58 h1:+HZ6RzZz6YBfA+Chtn0SnMU2OgY6nafl2sGbZ9FmerY= -github.com/microsoft/durabletask-go v0.4.1-0.20240621011625-bfcc3331ca58/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18= +github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.opentelemetry.io/otel v1.23.1 h1:Za4UzOqJYS+MUczKI320AtqZHZb7EqxO00jAHE0jmQY= -go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA= -go.opentelemetry.io/otel/metric v1.23.1 h1:PQJmqJ9u2QaJLBOELl1cxIdPcpbwzbkjfEyelTl2rlo= -go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI= -go.opentelemetry.io/otel/trace v1.23.1 h1:4LrmmEd8AU2rFvU1zegmvqW7+kWarxtNOPyeL6HmYY8= -go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= -golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -71,12 +71,12 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/service/common/type.go b/service/common/type.go index 9ea00cad..a134740f 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -105,16 +105,18 @@ type Subscription struct { DisableTopicValidation bool `json:"disableTopicValidation"` } +type SubscriptionResponseStatus string + const ( // SubscriptionResponseStatusSuccess means message is processed successfully. - SubscriptionResponseStatusSuccess = "SUCCESS" + SubscriptionResponseStatusSuccess SubscriptionResponseStatus = "SUCCESS" // SubscriptionResponseStatusRetry means message to be retried by Dapr. - SubscriptionResponseStatusRetry = "RETRY" + SubscriptionResponseStatusRetry SubscriptionResponseStatus = "RETRY" // SubscriptionResponseStatusDrop means warning is logged and message is dropped. - SubscriptionResponseStatusDrop = "DROP" + SubscriptionResponseStatusDrop SubscriptionResponseStatus = "DROP" ) // SubscriptionResponse represents the response handling hint from subscriber to Dapr. type SubscriptionResponse struct { - Status string `json:"status"` + Status SubscriptionResponseStatus `json:"status"` } diff --git a/service/http/topic.go b/service/http/topic.go index 09c4ce90..9e04eaa6 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -327,7 +327,7 @@ func getCustomMetdataFromHeaders(r *http.Request) map[string]string { return md } -func writeStatus(w http.ResponseWriter, s string) { +func writeStatus(w http.ResponseWriter, s common.SubscriptionResponseStatus) { status := &common.SubscriptionResponse{Status: s} if err := json.NewEncoder(w).Encode(status); err != nil { http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)