@@ -23,6 +23,9 @@ import (
23
23
"sync"
24
24
"sync/atomic"
25
25
26
+ "google.golang.org/grpc/codes"
27
+ "google.golang.org/grpc/status"
28
+
26
29
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
27
30
"github.com/dapr/go-sdk/service/common"
28
31
)
@@ -37,10 +40,15 @@ type SubscriptionOptions struct {
37
40
}
38
41
39
42
type Subscription struct {
43
+ ctx context.Context
40
44
stream pb.Dapr_SubscribeTopicEventsAlpha1Client
45
+
41
46
// lock locks concurrent writes to subscription stream.
42
47
lock sync.Mutex
43
48
closed atomic.Bool
49
+
50
+ createStream func (ctx context.Context , opts SubscriptionOptions ) (pb.Dapr_SubscribeTopicEventsAlpha1Client , error )
51
+ opts SubscriptionOptions
44
52
}
45
53
46
54
type SubscriptionMessage struct {
@@ -55,7 +63,10 @@ func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*
55
63
}
56
64
57
65
s := & Subscription {
58
- stream : stream ,
66
+ ctx : ctx ,
67
+ stream : stream ,
68
+ createStream : c .subscribeInitialRequest ,
69
+ opts : opts ,
59
70
}
60
71
61
72
return s , nil
@@ -101,52 +112,96 @@ func (s *Subscription) Close() error {
101
112
}
102
113
103
114
func (s * Subscription ) Receive () (* SubscriptionMessage , error ) {
104
- resp , err := s .stream .Recv ()
105
- if err != nil {
106
- return nil , err
107
- }
108
- event := resp .GetEventMessage ()
109
-
110
- data := any (event .GetData ())
111
- if len (event .GetData ()) > 0 {
112
- mediaType , _ , err := mime .ParseMediaType (event .GetDataContentType ())
113
- if err == nil {
114
- var v interface {}
115
- switch mediaType {
116
- case "application/json" :
117
- if err := json .Unmarshal (event .GetData (), & v ); err == nil {
118
- data = v
115
+ for {
116
+ resp , err := s .stream .Recv ()
117
+ if err != nil {
118
+ select {
119
+ case <- s .ctx .Done ():
120
+ return nil , errors .New ("subscription context closed" )
121
+ default :
122
+ // proceed to check the gRPC status error
123
+ }
124
+
125
+ st , ok := status .FromError (err )
126
+ if ! ok {
127
+ // not a grpc status error
128
+ return nil , err
129
+ }
130
+
131
+ switch st .Code () {
132
+ case codes .Unavailable , codes .Unknown :
133
+ logger .Printf ("gRPC error while reading from stream: %s (code=%v)" ,
134
+ st .Message (), st .Code ())
135
+ // close the current stream and reconnect
136
+ if s .closed .Load () {
137
+ return nil , errors .New ("subscription is permanently closed; cannot reconnect" )
138
+ }
139
+ if err := s .closeStreamOnly (); err != nil {
140
+ logger .Printf ("error closing current stream: %v" , err )
119
141
}
120
- case "text/plain" :
121
- // Assume UTF-8 encoded string.
122
- data = string (event .GetData ())
142
+
143
+ newStream , nerr := s .createStream (s .ctx , s .opts )
144
+ if nerr != nil {
145
+ return nil , errors .New ("re-subscribe failed" )
146
+ }
147
+
148
+ s .lock .Lock ()
149
+ s .stream = newStream
150
+ s .lock .Unlock ()
151
+
152
+ // try receiving again
153
+ continue
154
+
155
+ case codes .Canceled :
156
+ return nil , errors .New ("stream canceled" )
157
+
123
158
default :
124
- if strings .HasPrefix (mediaType , "application/" ) &&
125
- strings .HasSuffix (mediaType , "+json" ) {
159
+ return nil , errors .New ("subscription recv error" )
160
+ }
161
+ }
162
+
163
+ event := resp .GetEventMessage ()
164
+ data := any (event .GetData ())
165
+ if len (event .GetData ()) > 0 {
166
+ mediaType , _ , err := mime .ParseMediaType (event .GetDataContentType ())
167
+ if err == nil {
168
+ var v interface {}
169
+ switch mediaType {
170
+ case "application/json" :
126
171
if err := json .Unmarshal (event .GetData (), & v ); err == nil {
127
172
data = v
128
173
}
174
+ case "text/plain" :
175
+ // Assume UTF-8 encoded string.
176
+ data = string (event .GetData ())
177
+ default :
178
+ if strings .HasPrefix (mediaType , "application/" ) &&
179
+ strings .HasSuffix (mediaType , "+json" ) {
180
+ if err := json .Unmarshal (event .GetData (), & v ); err == nil {
181
+ data = v
182
+ }
183
+ }
129
184
}
130
185
}
131
186
}
132
- }
133
187
134
- topicEvent := & common.TopicEvent {
135
- ID : event .GetId (),
136
- Source : event .GetSource (),
137
- Type : event .GetType (),
138
- SpecVersion : event .GetSpecVersion (),
139
- DataContentType : event .GetDataContentType (),
140
- Data : data ,
141
- RawData : event .GetData (),
142
- Topic : event .GetTopic (),
143
- PubsubName : event .GetPubsubName (),
144
- }
188
+ topicEvent := & common.TopicEvent {
189
+ ID : event .GetId (),
190
+ Source : event .GetSource (),
191
+ Type : event .GetType (),
192
+ SpecVersion : event .GetSpecVersion (),
193
+ DataContentType : event .GetDataContentType (),
194
+ Data : data ,
195
+ RawData : event .GetData (),
196
+ Topic : event .GetTopic (),
197
+ PubsubName : event .GetPubsubName (),
198
+ }
145
199
146
- return & SubscriptionMessage {
147
- sub : s ,
148
- TopicEvent : topicEvent ,
149
- }, nil
200
+ return & SubscriptionMessage {
201
+ sub : s ,
202
+ TopicEvent : topicEvent ,
203
+ }, nil
204
+ }
150
205
}
151
206
152
207
func (s * SubscriptionMessage ) Success () error {
@@ -232,3 +287,13 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript
232
287
233
288
return stream , nil
234
289
}
290
+
291
+ func (s * Subscription ) closeStreamOnly () error {
292
+ s .lock .Lock ()
293
+ defer s .lock .Unlock ()
294
+
295
+ if s .stream != nil {
296
+ return s .stream .CloseSend ()
297
+ }
298
+ return nil
299
+ }
0 commit comments