@@ -2,6 +2,7 @@ package gorums
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
"math"
6
7
"math/rand"
7
8
"sync"
@@ -35,7 +36,7 @@ type responseRouter struct {
35
36
36
37
type channel struct {
37
38
sendQ chan request
38
- nodeID uint32
39
+ node * RawNode
39
40
mu sync.Mutex
40
41
lastError error
41
42
latency time.Duration
@@ -50,29 +51,39 @@ type channel struct {
50
51
cancelStream context.CancelFunc
51
52
responseRouters map [uint64 ]responseRouter
52
53
responseMut sync.Mutex
54
+ connEstablished bool
53
55
}
54
56
55
57
func newChannel (n * RawNode ) * channel {
56
58
return & channel {
57
59
sendQ : make (chan request , n .mgr .opts .sendBuffer ),
58
60
backoffCfg : n .mgr .opts .backoff ,
59
- nodeID : n . ID () ,
61
+ node : n ,
60
62
latency : - 1 * time .Second ,
61
63
rand : rand .New (rand .NewSource (time .Now ().UnixNano ())),
62
64
responseRouters : make (map [uint64 ]responseRouter ),
65
+ connEstablished : false ,
63
66
}
64
67
}
65
68
66
69
func (c * channel ) connect (ctx context.Context , conn * grpc.ClientConn ) error {
67
- var err error
68
70
c .parentCtx = ctx
71
+ go c .sendMsgs ()
72
+ if conn == nil {
73
+ return fmt .Errorf ("connection is nil" )
74
+ }
75
+ return c .tryConnect (conn )
76
+ }
77
+
78
+ func (c * channel ) tryConnect (conn * grpc.ClientConn ) error {
79
+ var err error
69
80
c .streamCtx , c .cancelStream = context .WithCancel (c .parentCtx )
70
81
c .gorumsClient = ordering .NewGorumsClient (conn )
71
82
c .gorumsStream , err = c .gorumsClient .NodeStream (c .streamCtx )
72
83
if err != nil {
73
84
return err
74
85
}
75
- go c . sendMsgs ()
86
+ c . connEstablished = true
76
87
go c .recvMsgs ()
77
88
return nil
78
89
}
@@ -160,17 +171,23 @@ func (c *channel) sendMsgs() {
160
171
return
161
172
case req = <- c .sendQ :
162
173
}
174
+ // try to connect to the node if previous attempts
175
+ // have failed or if the node has disconnected
176
+ if ! c .isConnected () {
177
+ // streamBroken will be set if the connection fails
178
+ c .tryReconnect ()
179
+ }
163
180
// return error if stream is broken
164
181
if c .streamBroken .get () {
165
182
err := status .Errorf (codes .Unavailable , "stream is down" )
166
- c .routeResponse (req .msg .Metadata .MessageID , response {nid : c .nodeID , msg : nil , err : err })
183
+ c .routeResponse (req .msg .Metadata .MessageID , response {nid : c .node . ID () , msg : nil , err : err })
167
184
continue
168
185
}
169
186
// else try to send message
170
187
err := c .sendMsg (req )
171
188
if err != nil {
172
189
// return the error
173
- c .routeResponse (req .msg .Metadata .MessageID , response {nid : c .nodeID , msg : nil , err : err })
190
+ c .routeResponse (req .msg .Metadata .MessageID , response {nid : c .node . ID () , msg : nil , err : err })
174
191
}
175
192
}
176
193
}
@@ -189,7 +206,7 @@ func (c *channel) recvMsgs() {
189
206
} else {
190
207
c .streamMut .RUnlock ()
191
208
err := status .FromProto (resp .Metadata .GetStatus ()).Err ()
192
- c .routeResponse (resp .Metadata .MessageID , response {nid : c .nodeID , msg : resp .Message , err : err })
209
+ c .routeResponse (resp .Metadata .MessageID , response {nid : c .node . ID () , msg : resp .Message , err : err })
193
210
}
194
211
195
212
select {
@@ -200,11 +217,37 @@ func (c *channel) recvMsgs() {
200
217
}
201
218
}
202
219
203
- func (c * channel ) reconnect () {
220
+ func (c * channel ) tryReconnect () {
221
+ // a connection has never been established
222
+ if ! c .connEstablished {
223
+ err := c .node .dial ()
224
+ if err != nil {
225
+ c .streamBroken .set ()
226
+ return
227
+ }
228
+ err = c .tryConnect (c .node .conn )
229
+ if err != nil {
230
+ c .streamBroken .set ()
231
+ return
232
+ }
233
+ }
234
+ // the node has previously been connected
235
+ // but is now disconnected
236
+ if c .streamBroken .get () {
237
+ // try to reconnect only once
238
+ c .reconnect (1 )
239
+ }
240
+ }
241
+
242
+ func (c * channel ) reconnect (maxRetries ... int ) {
204
243
c .streamMut .Lock ()
205
244
defer c .streamMut .Unlock ()
206
245
backoffCfg := c .backoffCfg
207
246
247
+ var maxretries float64 = - 1
248
+ if len (maxRetries ) > 0 {
249
+ maxretries = float64 (maxRetries [0 ])
250
+ }
208
251
var retries float64
209
252
for {
210
253
var err error
@@ -217,6 +260,10 @@ func (c *channel) reconnect() {
217
260
}
218
261
c .cancelStream ()
219
262
c .setLastErr (err )
263
+ if retries >= maxretries && maxretries > 0 {
264
+ c .streamBroken .set ()
265
+ return
266
+ }
220
267
delay := float64 (backoffCfg .BaseDelay )
221
268
max := float64 (backoffCfg .MaxDelay )
222
269
for r := retries ; delay < max && r > 0 ; r -- {
@@ -257,6 +304,10 @@ type atomicFlag struct {
257
304
flag int32
258
305
}
259
306
307
+ func (c * channel ) isConnected () bool {
308
+ return c .connEstablished && ! c .streamBroken .get ()
309
+ }
310
+
260
311
func (f * atomicFlag ) set () { atomic .StoreInt32 (& f .flag , 1 ) }
261
312
func (f * atomicFlag ) get () bool { return atomic .LoadInt32 (& f .flag ) == 1 }
262
313
func (f * atomicFlag ) clear () { atomic .StoreInt32 (& f .flag , 0 ) }
0 commit comments