Skip to content

Commit eb87024

Browse files
chore(broadcast): better logging
1 parent c7f38ef commit eb87024

File tree

2 files changed

+39
-27
lines changed

2 files changed

+39
-27
lines changed

broadcast/processor.go

+26-20
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ func (p *BroadcastProcessor) handle(msg Content) {
6262
msg.ReceiveChan <- shardResponse{
6363
err: OutOfOrderErr{},
6464
}
65-
p.log("msg: out of order", "err", OutOfOrderErr{}, "method", msg.CurrentMethod, "from", msg.SenderAddr)
65+
p.log("msg: out of order", OutOfOrderErr{}, "method", msg.CurrentMethod, "from", msg.SenderAddr)
6666
} else {
6767
msg.ReceiveChan <- shardResponse{
6868
err: nil,
6969
reqCtx: p.cancellationCtx,
7070
enqueueBroadcast: p.enqueueBroadcast,
7171
}
72-
p.log("msg: processed", "err", nil, "method", msg.CurrentMethod, "from", msg.SenderAddr)
72+
p.log("msg: processed", nil, "method", msg.CurrentMethod, "from", msg.SenderAddr)
7373
}
7474
}
7575
defer func() {
@@ -83,15 +83,15 @@ func (p *BroadcastProcessor) handle(msg Content) {
8383
//close(p.broadcastChan)
8484
//close(p.sendChan)
8585
p.emptyChannels(metadata)
86-
p.log("processor stopped", "err", nil, "started", p.started, "ended", p.ended)
86+
p.log("processor stopped", nil, "started", p.started, "ended", p.ended)
8787
}()
8888
for {
8989
select {
9090
case <-p.ctx.Done():
9191
return
9292
case bMsg := <-p.broadcastChan:
9393
if p.broadcastID != bMsg.BroadcastID {
94-
p.log("broadcast: wrong BroadcastID", "err", BroadcastIDErr{}, "type", bMsg.MsgType.String(), "stopping", false)
94+
p.log("broadcast: wrong BroadcastID", BroadcastIDErr{}, "type", bMsg.MsgType.String(), "stopping", false)
9595
continue
9696
}
9797
switch bMsg.MsgType {
@@ -116,7 +116,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
116116
new.ReceiveChan <- shardResponse{
117117
err: BroadcastIDErr{},
118118
}
119-
p.log("msg: wrong BroadcastID", "err", BroadcastIDErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
119+
p.log("msg: wrong BroadcastID", BroadcastIDErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
120120
continue
121121
}
122122
if new.IsCancellation {
@@ -126,7 +126,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
126126
new.ReceiveChan <- shardResponse{
127127
err: nil,
128128
}
129-
p.log("msg: received cancellation", "err", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
129+
p.log("msg: received cancellation", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
130130
continue
131131
}
132132

@@ -137,7 +137,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
137137
new.ReceiveChan <- shardResponse{
138138
err: ClientReqAlreadyReceivedErr{},
139139
}
140-
p.log("msg: duplicate client req", "err", ClientReqAlreadyReceivedErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
140+
p.log("msg: duplicate client req", ClientReqAlreadyReceivedErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
141141
continue
142142
}
143143
// important to set this option to prevent duplicate client reqs.
@@ -155,7 +155,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
155155
}
156156
p.cancellationCtxCancel()
157157
}()
158-
p.log("msg: received client req", "err", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
158+
p.log("msg: received client req", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
159159
}
160160

161161
metadata.update(new)
@@ -174,7 +174,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
174174
err: err,
175175
}
176176
// slog.Info("receive: late", "err", err, "id", p.broadcastID)
177-
p.log("msg: late msg", "err", err, "method", new.CurrentMethod, "from", new.SenderAddr)
177+
p.log("msg: late msg", err, "method", new.CurrentMethod, "from", new.SenderAddr)
178178
return
179179
}
180180
if !p.isInOrder(new.CurrentMethod) {
@@ -183,26 +183,26 @@ func (p *BroadcastProcessor) handle(msg Content) {
183183
new.ReceiveChan <- shardResponse{
184184
err: OutOfOrderErr{},
185185
}
186-
p.log("msg: out of order", "err", OutOfOrderErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
186+
p.log("msg: out of order", OutOfOrderErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
187187
continue
188188
}
189189
new.ReceiveChan <- shardResponse{
190190
err: nil,
191191
reqCtx: p.cancellationCtx,
192192
enqueueBroadcast: p.enqueueBroadcast,
193193
}
194-
p.log("msg: processed", "err", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
194+
p.log("msg: processed", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
195195
}
196196
}
197197
}
198198

199199
func (p *BroadcastProcessor) handleCancellation(bMsg Msg, metadata *metadata) bool {
200200
if bMsg.Cancellation.end {
201-
p.log("broadcast: broadcast.Done() called", "err", nil, "type", bMsg.MsgType.String(), "stopping", true)
201+
p.log("broadcast: broadcast.Done() called", nil, "type", bMsg.MsgType.String(), "stopping", true)
202202
return true
203203
}
204204
if !metadata.SentCancellation {
205-
p.log("broadcast: sent cancellation", "err", nil, "type", bMsg.MsgType.String(), "stopping", false)
205+
p.log("broadcast: sent cancellation", nil, "type", bMsg.MsgType.String(), "stopping", false)
206206
metadata.SentCancellation = true
207207
go p.router.Send(p.broadcastID, "", "", bMsg.Cancellation)
208208
}
@@ -216,16 +216,22 @@ func (p *BroadcastProcessor) handleBroadcast(bMsg Msg, methods []string, metadat
216216
return false
217217
}
218218
p.router.Send(p.broadcastID, metadata.OriginAddr, metadata.OriginMethod, bMsg.Msg)
219-
p.log("broadcast: sending broadcast", "err", nil, "type", bMsg.MsgType.String(), "stopping", false, "isBroadcastCall", metadata.isBroadcastCall())
219+
p.log("broadcast: sending broadcast", nil, "type", bMsg.MsgType.String(), "method", bMsg.Method, "stopping", false, "isBroadcastCall", metadata.isBroadcastCall())
220220

221221
p.updateOrder(bMsg.Method)
222222
p.dispatchOutOfOrderMsgs()
223223
return true
224224
}
225225

226-
func (p *BroadcastProcessor) log(msg string, args ...any) {
226+
func (p *BroadcastProcessor) log(msg string, err error, args ...any) {
227227
if p.logger != nil {
228-
p.logger.Debug(msg, args...)
228+
if err != nil {
229+
args = append(args, "err", err.Error())
230+
p.logger.Error(msg, args...)
231+
} else {
232+
args = append(args, "err", nil)
233+
p.logger.Info(msg, args...)
234+
}
229235
}
230236
}
231237

@@ -234,7 +240,7 @@ func (p *BroadcastProcessor) handleReply(bMsg Msg, metadata *metadata) bool {
234240
if metadata.isBroadcastCall() {
235241
go p.router.Send(p.broadcastID, metadata.OriginAddr, metadata.OriginMethod, bMsg.Reply)
236242
// the request is done becuase we have sent a reply to the client
237-
p.log("broadcast: sending reply to client", "err", nil, "type", bMsg.MsgType.String(), "stopping", true, "isBroadcastCall", metadata.isBroadcastCall())
243+
p.log("broadcast: sending reply to client", nil, "type", bMsg.MsgType.String(), "stopping", true, "isBroadcastCall", metadata.isBroadcastCall())
238244
return true
239245
}
240246
// QuorumCall if origin addr is empty.
@@ -254,11 +260,11 @@ func (p *BroadcastProcessor) handleReply(bMsg Msg, metadata *metadata) bool {
254260
// the request is not done yet because we have not replied to
255261
// the client.
256262
//slog.Info("reply: late", "err", err, "id", p.broadcastID)
257-
p.log("broadcast: failed to send reply to client", "err", err, "type", bMsg.MsgType.String(), "stopping", false, "isBroadcastCall", metadata.isBroadcastCall())
263+
p.log("broadcast: failed to send reply to client", err, "type", bMsg.MsgType.String(), "stopping", false, "isBroadcastCall", metadata.isBroadcastCall())
258264
return false
259265
}
260266
// the request is done becuase we have sent a reply to the client
261-
p.log("broadcast: sending reply to client", "err", err, "type", bMsg.MsgType.String(), "stopping", true, "isBroadcastCall", metadata.isBroadcastCall())
267+
p.log("broadcast: sending reply to client", err, "type", bMsg.MsgType.String(), "stopping", true, "isBroadcastCall", metadata.isBroadcastCall())
262268
return true
263269
}
264270

@@ -419,7 +425,7 @@ func (r *BroadcastProcessor) dispatchOutOfOrderMsgs() {
419425
if order <= r.orderIndex {
420426
for _, msg := range msgs {
421427
msg.Run(r.cancellationCtx, r.enqueueBroadcast)
422-
r.log("msg: dispatching out of order msg", "err", nil, "method", msg.CurrentMethod, "from", msg.SenderAddr)
428+
r.log("msg: dispatching out of order msg", nil, "method", msg.CurrentMethod, "from", msg.SenderAddr)
423429
}
424430
handledMethods = append(handledMethods, method)
425431
}

broadcast/router.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (r *BroadcastRouter) Send(broadcastID uint64, addr, method string, req any)
7171
return nil
7272
}
7373
err := errors.New("wrong req type")
74-
r.log("router: malformed msg", "err", err, "BroadcastID", broadcastID)
74+
r.log("router: malformed msg", err, "BroadcastID", broadcastID)
7575
return err
7676
}
7777

@@ -87,7 +87,7 @@ func (r *BroadcastRouter) routeBroadcast(broadcastID uint64, addr, method string
8787
return nil
8888
}
8989
err := errors.New("handler not found")
90-
r.log("router (broadcast): could not find handler", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
90+
r.log("router (broadcast): could not find handler", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
9191
return err
9292
}
9393

@@ -96,17 +96,17 @@ func (r *BroadcastRouter) routeClientReply(broadcastID uint64, addr, method stri
9696
if _, ok := r.clientHandlers[method]; ok && addr != "" {
9797
client, err := r.getClient(addr)
9898
if err != nil {
99-
r.log("router (reply): could not get client", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
99+
r.log("router (reply): could not get client", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
100100
return err
101101
}
102102
err = client.SendMsg(broadcastID, method, resp.getResponse(), r.dialTimeout)
103-
r.log("router (reply): could not send reply", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
103+
r.log("router (reply): sending reply to client", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
104104
return err
105105
}
106106
// the server can receive a broadcast from another server before a client sends a direct message.
107107
// it should thus wait for a potential message from the client. otherwise, it should be removed.
108108
err := errors.New("not routed")
109-
r.log("router (reply): could not find handler", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
109+
r.log("router (reply): could not find handler", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
110110
return err
111111
}
112112

@@ -141,9 +141,15 @@ func (r *BroadcastRouter) getClient(addr string) (*Client, error) {
141141
return client, nil
142142
}
143143

144-
func (r *BroadcastRouter) log(msg string, args ...any) {
144+
func (r *BroadcastRouter) log(msg string, err error, args ...any) {
145145
if r.logger != nil {
146-
r.logger.Debug(msg, args...)
146+
if err != nil {
147+
args = append(args, "err", err.Error())
148+
r.logger.Error(msg, args...)
149+
} else {
150+
args = append(args, "err", nil)
151+
r.logger.Info(msg, args...)
152+
}
147153
}
148154
}
149155

0 commit comments

Comments
 (0)