Skip to content

Commit e974ea8

Browse files
authored
add deadlettertopic support to non-streaming subscriptions (#685)
* add deadlettertopic support to non-streaming subscriptions Signed-off-by: yaron2 <[email protected]> * fix tests Signed-off-by: yaron2 <[email protected]> --------- Signed-off-by: yaron2 <[email protected]>
1 parent 3833d13 commit e974ea8

File tree

5 files changed

+20
-14
lines changed

5 files changed

+20
-14
lines changed

service/common/type.go

+2
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ type Subscription struct {
107107
Priority int `json:"priority"`
108108
// DisableTopicValidation allows to receive events from publisher topics that differ from the subscribed topic.
109109
DisableTopicValidation bool `json:"disableTopicValidation"`
110+
// DeadLetterTopic is the name of the deadletter topic.
111+
DeadLetterTopic string `json:"deadLetterTopic"`
110112
}
111113

112114
type SubscriptionResponseStatus string

service/grpc/topic.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *emptypb.Empty)
5353
for _, v := range s.topicRegistrar {
5454
s := v.Subscription
5555
sub := &runtimev1pb.TopicSubscription{
56-
PubsubName: s.PubsubName,
57-
Topic: s.Topic,
58-
Metadata: s.Metadata,
59-
Routes: convertRoutes(s.Routes),
56+
PubsubName: s.PubsubName,
57+
Topic: s.Topic,
58+
Metadata: s.Metadata,
59+
Routes: convertRoutes(s.Routes),
60+
DeadLetterTopic: s.DeadLetterTopic,
6061
}
6162
subs = append(subs, sub)
6263
}

service/internal/topicregistrar.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
3939
ts, ok := m[key]
4040
if !ok {
4141
ts = &TopicRegistration{
42-
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic),
42+
Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic, sub.DeadLetterTopic),
4343
RouteHandlers: make(map[string]common.TopicEventSubscriber),
4444
DefaultHandler: nil,
4545
}

service/internal/topicsubscription.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type TopicSubscription struct {
1818
Routes *TopicRoutes `json:"routes,omitempty"`
1919
// Metadata is the subscription metadata.
2020
Metadata map[string]string `json:"metadata,omitempty"`
21+
// DeadLetterTopic is the name of the deadletter topic.
22+
DeadLetterTopic string `json:"deadLetterTopic"`
2123
}
2224

2325
// TopicRoutes encapsulates the default route and multiple routing rules.
@@ -42,10 +44,11 @@ type TopicRule struct {
4244
}
4345

4446
// NewTopicSubscription creates a new `TopicSubscription`.
45-
func NewTopicSubscription(pubsubName, topic string) *TopicSubscription {
47+
func NewTopicSubscription(pubsubName, topic, deadLetterTopic string) *TopicSubscription {
4648
return &TopicSubscription{ //nolint:exhaustivestruct
47-
PubsubName: pubsubName,
48-
Topic: topic,
49+
PubsubName: pubsubName,
50+
Topic: topic,
51+
DeadLetterTopic: deadLetterTopic,
4952
}
5053
}
5154

service/internal/topicsubscription_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
func TestTopicSubscripiton(t *testing.T) {
1414
t.Run("duplicate metadata", func(t *testing.T) {
15-
sub := internal.NewTopicSubscription("test", "mytopic")
15+
sub := internal.NewTopicSubscription("test", "mytopic", "")
1616
require.NoError(t, sub.SetMetadata(map[string]string{
1717
"test": "test",
1818
}))
@@ -22,23 +22,23 @@ func TestTopicSubscripiton(t *testing.T) {
2222
})
2323

2424
t.Run("duplicate route", func(t *testing.T) {
25-
sub := internal.NewTopicSubscription("test", "mytopic")
25+
sub := internal.NewTopicSubscription("test", "mytopic", "")
2626
require.NoError(t, sub.SetDefaultRoute("/test"))
2727
assert.Equal(t, "/test", sub.Route)
2828
require.EqualError(t, sub.SetDefaultRoute("/test"),
2929
"subscription for topic mytopic on pubsub test already has route /test")
3030
})
3131

3232
t.Run("duplicate route after routing rule", func(t *testing.T) {
33-
sub := internal.NewTopicSubscription("test", "mytopic")
33+
sub := internal.NewTopicSubscription("test", "mytopic", "")
3434
require.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0))
3535
require.NoError(t, sub.SetDefaultRoute("/test"))
3636
require.EqualError(t, sub.SetDefaultRoute("/test"),
3737
"subscription for topic mytopic on pubsub test already has route /test")
3838
})
3939

4040
t.Run("default route after routing rule", func(t *testing.T) {
41-
sub := internal.NewTopicSubscription("test", "mytopic")
41+
sub := internal.NewTopicSubscription("test", "mytopic", "")
4242
require.NoError(t, sub.SetDefaultRoute("/test"))
4343
assert.Equal(t, "/test", sub.Route)
4444
require.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0))
@@ -49,14 +49,14 @@ func TestTopicSubscripiton(t *testing.T) {
4949
})
5050

5151
t.Run("duplicate routing rule priority", func(t *testing.T) {
52-
sub := internal.NewTopicSubscription("test", "mytopic")
52+
sub := internal.NewTopicSubscription("test", "mytopic", "")
5353
require.NoError(t, sub.AddRoutingRule("/other", `event.type == "other"`, 1))
5454
require.EqualError(t, sub.AddRoutingRule("/test", `event.type == "test"`, 1),
5555
"subscription for topic mytopic on pubsub test already has a routing rule with priority 1")
5656
})
5757

5858
t.Run("priority ordering", func(t *testing.T) {
59-
sub := internal.NewTopicSubscription("test", "mytopic")
59+
sub := internal.NewTopicSubscription("test", "mytopic", "")
6060
require.NoError(t, sub.AddRoutingRule("/100", `event.type == "100"`, 100))
6161
require.NoError(t, sub.AddRoutingRule("/1", `event.type == "1"`, 1))
6262
require.NoError(t, sub.AddRoutingRule("/50", `event.type == "50"`, 50))

0 commit comments

Comments
 (0)