Skip to content

Commit 1628a4b

Browse files
feat(broadcast): added retries to channel
1 parent 0c91d0b commit 1628a4b

File tree

4 files changed

+80
-14
lines changed

4 files changed

+80
-14
lines changed

channel.go

+56-8
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import (
2020
var streamDownErr = status.Error(codes.Unavailable, "stream is down")
2121

2222
type request struct {
23-
ctx context.Context
24-
msg *Message
25-
opts callOptions
23+
ctx context.Context
24+
msg *Message
25+
opts callOptions
26+
numFailed int
2627
}
2728

2829
// waitForSend returns true if the WithNoSendWaiting call option is not set.
@@ -60,6 +61,7 @@ type channel struct {
6061
cancelStream context.CancelFunc
6162
responseRouters map[uint64]responseRouter
6263
responseMut sync.Mutex
64+
maxRetries int // number of times we try to resend a failed msg
6365
}
6466

6567
// newChannel creates a new channel for the given node and starts the sending goroutine.
@@ -76,6 +78,7 @@ func newChannel(n *RawNode) *channel {
7678
latency: -1 * time.Second,
7779
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
7880
responseRouters: make(map[uint64]responseRouter),
81+
maxRetries: n.mgr.opts.maxRetries,
7982
}
8083
// parentCtx controls the channel and is used to shut it down
8184
c.parentCtx = n.newContext()
@@ -227,10 +230,21 @@ func (c *channel) sendMsg(req request) (err error) {
227230
case <-done:
228231
// false alarm
229232
default:
233+
// CANCELLING HERE CAN HAVE DESTRUCTIVE EFFECTS!
234+
// Imagine the client has sent several requests and is waiting
235+
// for a response on each individual request. Furthermore, let's
236+
// say the client has sent a message to two different handlers:
237+
// 1. A handler that does a lot of work and thus long response times are expected.
238+
// 2. A handler that is normally very fast.
239+
//
240+
// If the client is impatient and cancels a request sent to a handler in scenario 2,
241+
// then all requests sent to the handler in scenario 1 will also be cancelled because
242+
// the stream is taken down.
243+
230244
// trigger reconnect
231-
c.streamMut.Lock()
232-
c.cancelStream()
233-
c.streamMut.Unlock()
245+
//c.streamMut.Lock()
246+
//c.cancelStream()
247+
//c.streamMut.Unlock()
234248
}
235249
}
236250
}()
@@ -266,14 +280,16 @@ func (c *channel) sender() {
266280
}
267281
// return error if stream is broken
268282
if c.streamBroken.get() {
269-
c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: streamDownErr})
283+
//c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: streamDownErr})
284+
go c.retryMsg(req, streamDownErr)
270285
continue
271286
}
272287
// else try to send message
273288
err := c.sendMsg(req)
274289
if err != nil {
275290
// return the error
276-
c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: err})
291+
//c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: err})
292+
go c.retryMsg(req, err)
277293
}
278294
}
279295
}
@@ -396,6 +412,38 @@ func (c *channel) reconnect(maxRetries float64) {
396412
}
397413
}
398414

415+
// This method should always be run in a goroutine. It will
416+
// enqueue a msg if it has previously failed. The message will
417+
// be dropped if it fails more than maxRetries or if the ctx
418+
// is cancelled.
419+
func (c *channel) retryMsg(req request, err error) {
420+
req.numFailed++
421+
// c.maxRetries = -1, is the same as infinite retries.
422+
if req.numFailed > c.maxRetries && c.maxRetries != -1 {
423+
c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: fmt.Errorf("max retries exceeded. err=%e", err)})
424+
return
425+
}
426+
//delay := float64(c.backoffCfg.BaseDelay)
427+
delay := float64(10 * time.Millisecond)
428+
max := float64(c.backoffCfg.MaxDelay)
429+
for r := req.numFailed; delay < max && r > 0; r-- {
430+
delay *= c.backoffCfg.Multiplier
431+
}
432+
delay = math.Min(delay, max)
433+
delay *= 1 + c.backoffCfg.Jitter*(rand.Float64()*2-1)
434+
select {
435+
case <-c.parentCtx.Done():
436+
c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: fmt.Errorf("channel closed")})
437+
return
438+
case <-req.ctx.Done():
439+
c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: fmt.Errorf("context cancelled")})
440+
return
441+
case <-time.After(time.Duration(delay)):
442+
// enqueue the request again
443+
}
444+
c.enqueueSlow(req)
445+
}
446+
399447
func (c *channel) setLastErr(err error) {
400448
c.mu.Lock()
401449
defer c.mu.Unlock()

clientserver.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ func (srv *ClientServer) AddRequest(broadcastID uint64, clientCtx context.Contex
7979
IsBroadcastClient: true,
8080
OriginAddr: srv.lis.Addr().String(),
8181
}
82-
doneChan := make(chan protoreflect.ProtoMessage)
83-
respChan := make(chan protoreflect.ProtoMessage)
82+
// we expect one response when we are done
83+
doneChan := make(chan protoreflect.ProtoMessage, 1)
84+
// we should buffer this channel according to the number of servers.
85+
// most configs hopefully contain less than 7 servers.
86+
respChan := make(chan protoreflect.ProtoMessage, 7)
8487
ctx, cancel := context.WithCancel(srv.ctx)
8588

8689
srv.mu.Lock()

opts.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ type managerOptions struct {
1919
sendBuffer uint
2020
metadata metadata.MD
2121
perNodeMD func(uint32) metadata.MD
22-
publicKey string
23-
machineID uint64
22+
publicKey string // used when authenticating msgs
23+
machineID uint64 // used for generating SnowflakeIDs
24+
maxRetries int // number of times we try to resend a failed msg
2425
}
2526

2627
func newManagerOptions() managerOptions {
@@ -30,7 +31,8 @@ func newManagerOptions() managerOptions {
3031
nodeDialTimeout: 50 * time.Millisecond,
3132
// Provide an illegal machineID to avoid unintentional collisions.
3233
// 0 is a valid MachineID and should not be used as default.
33-
machineID: uint64(broadcast.MaxMachineID) + 1,
34+
machineID: uint64(broadcast.MaxMachineID) + 1,
35+
maxRetries: 0,
3436
}
3537
}
3638

@@ -108,8 +110,21 @@ func WithPublicKey(publicKey string) ManagerOption {
108110
}
109111
}
110112

113+
// WithMachineID returns a ManagerOption that allows you to set a unique ID for the client.
114+
// This ID will be embedded in broadcast request sent from the client, making the requests
115+
// trackable by the whole cluster. A random ID will be generated if not set. This can cause
116+
// collisions if there are many clients. MaxID = 4096.
111117
func WithMachineID(id uint64) ManagerOption {
112118
return func(o *managerOptions) {
113119
o.machineID = id
114120
}
115121
}
122+
123+
// WithRetries returns a ManagerOption that allows you to specify how many times the node
124+
// will try to send a message. The message will be dropped if it fails to send the message
125+
// more than the specified number of times.
126+
func WithRetries(maxRetries int) ManagerOption {
127+
return func(o *managerOptions) {
128+
o.maxRetries = maxRetries
129+
}
130+
}

tests/broadcast/broadcast_gorums.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)