Skip to content

Commit

Permalink
feat: bulksubscribe support
Browse files Browse the repository at this point in the history
Signed-off-by: sadath-12 <[email protected]>
  • Loading branch information
sadath-12 committed Dec 3, 2023
1 parent fdd425a commit cb26d03
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 127 deletions.
62 changes: 0 additions & 62 deletions examples/pubsub/sub/bulksub.go

This file was deleted.

5 changes: 5 additions & 0 deletions examples/pubsub/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ var importantSubscription = &common.Subscription{
func main() {
s := daprd.NewService(":8080")

// for single event subscribing
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

// if err := s.AddBulkTopicEventHandler(defaultSubscription, eventHandler,10,100); err != nil {
// log.Fatalf("error adding topic subscription: %v", err)
// }

if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("error listenning: %v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Service interface {
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error

AddBulkTopicEventHandler(sub *Subscription, fn BulkTopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error
// AddBulkTopicEventHandler appends provided event handler with its topic along with configuring maxMessagesCount, maxAwaitDurationMs for bulk handling and optional metadata to the service.
AddBulkTopicEventHandler(sub *Subscription, fn TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
Expand All @@ -55,7 +55,6 @@ type Service interface {
type (
ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
TopicEventHandler func(ctx context.Context, e *TopicEvent) (retry bool, err error)
BulkTopicEventHandler func(ctx context.Context, e []TopicEvent) (retry bool, err error)
BindingInvocationHandler func(ctx context.Context, in *BindingEvent) (out []byte, err error)
HealthCheckHandler func(context.Context) error
)
24 changes: 18 additions & 6 deletions service/grpc/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return s.topicRegistrar.AddSubscription(sub, fn)
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.BulkTopicEventHandler,maxMessagesCount,maxAwaitDurationMs int32) error {
func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
if sub == nil {
return errors.New("subscription required")
}

Check warning on line 43 in service/grpc/topic.go

View check run for this annotation

Codecov / codecov/patch

service/grpc/topic.go#L40-L43

Added lines #L40 - L43 were not covered by tests

return s.topicRegistrar.AddBulkSubscription(sub, fn,maxMessagesCount,maxAwaitDurationMs)
return s.topicRegistrar.AddBulkSubscription(sub, fn, maxMessagesCount, maxAwaitDurationMs)

Check warning on line 45 in service/grpc/topic.go

View check run for this annotation

Codecov / codecov/patch

service/grpc/topic.go#L45

Added line #L45 was not covered by tests
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
Expand All @@ -51,10 +51,11 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &runtimev1pb.TopicSubscription{
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
BulkSubscribe: convertBulkSubscribe(s.BulkSubscribe),
}
subs = append(subs, sub)
}
Expand All @@ -81,6 +82,17 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes {
}
}

func convertBulkSubscribe(bulkSubscribe *internal.BulkSubscribe) *runtimev1pb.BulkSubscribeConfig {
if bulkSubscribe == nil {
return nil
}
return &runtimev1pb.BulkSubscribeConfig{
Enabled: bulkSubscribe.Enabled,
MaxMessagesCount: bulkSubscribe.MaxMessagesCount,
MaxAwaitDurationMs: bulkSubscribe.MaxAwaitDurationMs,
}

Check warning on line 93 in service/grpc/topic.go

View check run for this annotation

Codecov / codecov/patch

service/grpc/topic.go#L89-L93

Added lines #L89 - L93 were not covered by tests
}

// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
Expand Down
61 changes: 23 additions & 38 deletions service/http/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,25 +325,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return nil
}

type BulkSubscribeMessageItem struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event interface{} `json:"event"`
Metadata map[string]string `json:"metadata"`
ContentType string `json:"contentType,omitempty"`
}

type BulkSubscribeEnvelope struct {
ID string
Entries []BulkSubscribeMessageItem
Metadata map[string]string
Topic string
Pubsub string
EventType string
}

// == APP == the item is {application/cloudevents+json 5e582fd2-f1c4-47bd-81b1-803fb7e86552 map[data:multi-pong datacontenttype:text/plain id:92fc5348-097d-4b9f-b093-7e6fcda77add pubsubname:messages source:pub specversion:1.0 time:2023-12-02T11:42:31+05:30 topic:neworder traceid:00-a0373ef078e14e8db358c06e0ec18b27-d8dae6b5080eb9da-01 traceparent:00-a0373ef078e14e8db358c06e0ec18b27-d8dae6b5080eb9da-01 tracestate: type:com.dapr.event.sent] [] <nil>}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.BulkTopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
if sub == nil {
return errors.New("subscription required")
}

Check warning on line 331 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L328-L331

Added lines #L328 - L331 were not covered by tests
Expand Down Expand Up @@ -376,15 +358,14 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
}

Check warning on line 358 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L355-L358

Added lines #L355 - L358 were not covered by tests

// deserialize the event
var ins BulkSubscribeEnvelope
var ins internal.BulkSubscribeEnvelope
if err = json.Unmarshal(body, &ins); err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}

Check warning on line 365 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L361-L365

Added lines #L361 - L365 were not covered by tests

statuses := make([]BulkSubscribeResponseEntry, 0, len(ins.Entries))

var messages []common.TopicEvent
for _, entry := range ins.Entries {
itemJSON, err := json.Marshal(entry.Event)

Check failure on line 370 in service/http/topic.go

View workflow job for this annotation

GitHub Actions / Test on 1.19

shadow: declaration of "err" shadows declaration at line 346 (govet)

Check failure on line 370 in service/http/topic.go

View workflow job for this annotation

GitHub Actions / Test on 1.20

shadow: declaration of "err" shadows declaration at line 346 (govet)

Check failure on line 370 in service/http/topic.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

shadow: declaration of "err" shadows declaration at line 346 (govet)
if err != nil {
Expand All @@ -405,12 +386,6 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
}
data, rawData := in.getData()

statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Success,
},
)

te := common.TopicEvent{
ID: in.ID,
SpecVersion: in.SpecVersion,
Expand All @@ -425,8 +400,28 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
Topic: in.Topic,
}

messages = append(messages, te)
retry, err := fn(r.Context(), &te)
if err == nil {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Success,
},
)
} else if retry {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Retry,
},
)
} else {
statuses = append(statuses, BulkSubscribeResponseEntry{
EntryId: entry.EntryId,
Status: Drop,
},
)
}

Check warning on line 422 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L367-L422

Added lines #L367 - L422 were not covered by tests
}

resp := BulkSubscribeResponse{
Statuses: statuses,
}
Expand All @@ -437,16 +432,6 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

retry, err := fn(r.Context(), messages)
if err == nil {
writeBulkStatus(w, resp)
return
}

if retry {
writeBulkStatus(w, resp)
return
}
writeBulkStatus(w, resp)

Check warning on line 435 in service/http/topic.go

View check run for this annotation

Codecov / codecov/patch

service/http/topic.go#L425-L435

Added lines #L425 - L435 were not covered by tests
})))

Expand Down
23 changes: 5 additions & 18 deletions service/internal/topicregistrar.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package internal

import (
"context"
"errors"
"fmt"

Expand All @@ -17,9 +16,7 @@ type TopicRegistrar map[string]*TopicRegistration
type TopicRegistration struct {
Subscription *TopicSubscription

Check failure on line 17 in service/internal/topicregistrar.go

View workflow job for this annotation

GitHub Actions / Test on 1.19

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 17 in service/internal/topicregistrar.go

View workflow job for this annotation

GitHub Actions / Test on 1.20

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 17 in service/internal/topicregistrar.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

File is not `gofmt`-ed with `-s` (gofmt)
DefaultHandler common.TopicEventHandler
DefaultBulkHandler common.BulkTopicEventHandler
RouteHandlers map[string]common.TopicEventHandler
BulkRouteHandlers map[string]common.BulkTopicEventHandler
}

func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error {
Expand Down Expand Up @@ -66,7 +63,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
return nil
}

func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.BulkTopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
if sub.Topic == "" {
return errors.New("topic name required")
}
Expand All @@ -90,8 +87,6 @@ func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
RouteHandlers: make(map[string]common.TopicEventHandler),
DefaultHandler: nil,
BulkRouteHandlers: make(map[string]common.BulkTopicEventHandler),
DefaultBulkHandler: nil,
}
ts.Subscription.SetMetadata(sub.Metadata)
m[key] = ts
Expand All @@ -107,19 +102,11 @@ func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.
if err := ts.Subscription.SetDefaultRoute(sub.Route); err != nil {
return err
}

Check warning on line 104 in service/internal/topicregistrar.go

View check run for this annotation

Codecov / codecov/patch

service/internal/topicregistrar.go#L95-L104

Added lines #L95 - L104 were not covered by tests
ts.DefaultBulkHandler = func(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
return false,nil
}
ts.DefaultHandler = func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false,nil
}
}
ts.BulkRouteHandlers[sub.Route] = func(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
return false,nil
}
ts.RouteHandlers[sub.Route] = func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
return false,nil

ts.DefaultHandler = fn

Check warning on line 106 in service/internal/topicregistrar.go

View check run for this annotation

Codecov / codecov/patch

service/internal/topicregistrar.go#L106

Added line #L106 was not covered by tests
}

ts.RouteHandlers[sub.Route] = fn

return nil

Check warning on line 111 in service/internal/topicregistrar.go

View check run for this annotation

Codecov / codecov/patch

service/internal/topicregistrar.go#L109-L111

Added lines #L109 - L111 were not covered by tests
}
17 changes: 17 additions & 0 deletions service/internal/topicsubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ type TopicSubscription struct {
BulkSubscribe *BulkSubscribe `json:"bulkSubscribe,omitempty"`
}

type BulkSubscribeMessageItem struct {
EntryId string `json:"entryId"` //nolint:stylecheck
Event interface{} `json:"event"`
Metadata map[string]string `json:"metadata"`
ContentType string `json:"contentType,omitempty"`
}

type BulkSubscribeEnvelope struct {

Check failure on line 32 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.19

`BulkSubscribeEnvelope` should be annotated with the `json` tag as it is passed to `json.Unmarshal` at service/http/topic.go:362:13 (musttag)

Check failure on line 32 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.20

`BulkSubscribeEnvelope` should be annotated with the `json` tag as it is passed to `json.Unmarshal` at service/http/topic.go:362:13 (musttag)

Check failure on line 32 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

`BulkSubscribeEnvelope` should be annotated with the `json` tag as it is passed to `json.Unmarshal` at service/http/topic.go:362:13 (musttag)
ID string
Entries []BulkSubscribeMessageItem
Metadata map[string]string
Topic string
Pubsub string
EventType string
}

Check failure on line 40 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.19

File is not `gofumpt`-ed (gofumpt)

Check failure on line 40 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.20

File is not `gofumpt`-ed (gofumpt)

Check failure on line 40 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

File is not `gofumpt`-ed (gofumpt)

Check failure on line 41 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.19

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 41 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.20

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 41 in service/internal/topicsubscription.go

View workflow job for this annotation

GitHub Actions / Test on 1.21

File is not `gofmt`-ed with `-s` (gofmt)
type BulkSubscribe struct {
Enabled bool `json:"enabled"`
MaxMessagesCount int32 `json:"maxMessagesCount,omitempty"`
Expand Down

0 comments on commit cb26d03

Please sign in to comment.