Skip to content

Commit 797e5ee

Browse files
fix(broadcast): checks whether a client req has arrived immediately
1 parent 9c20ba7 commit 797e5ee

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

broadcast/processor.go

+24
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,30 @@ func (p *BroadcastProcessor) handle(msg *Content) {
5757
p.initOrder()
5858
// connect to client immediately to potentially save some time
5959
go p.router.Connect(metadata.OriginAddr)
60+
if msg.IsBroadcastClient {
61+
// important to set this option to prevent duplicate client reqs.
62+
// this can be the result if a server forwards the req but the
63+
// leader has already received the client req.
64+
metadata.HasReceivedClientReq = true
65+
go func() {
66+
// new.Ctx will correspond to the streamCtx between the client and this server.
67+
// We can thus listen to it and signal a cancellation if the client goes offline
68+
// or cancels the request. We also have to listen to the p.ctx to prevent leaking
69+
// the goroutine.
70+
select {
71+
case <-p.ctx.Done():
72+
case <-msg.Ctx.Done():
73+
}
74+
// when the processor returns it sets p.cancellationCtxCancel = nil.
75+
// Hence, it is important to check if it is nil. Also, the processor
76+
// calls this function when it returns.
77+
if p.cancellationCtxCancel != nil {
78+
p.cancellationCtxCancel()
79+
}
80+
}()
81+
p.log("msg: received client req", nil, logging.Method(msg.CurrentMethod), logging.From(msg.SenderAddr))
82+
}
83+
// all messages should always have a ReceiveChan
6084
if msg.ReceiveChan != nil {
6185
if !p.isInOrder(msg.CurrentMethod) {
6286
// save the message and execute it later

0 commit comments

Comments
 (0)