Skip to content

Commit c7f38ef

Browse files
feat(broadcast): added logging
1 parent d490532 commit c7f38ef

13 files changed

+89
-36
lines changed

broadcast/processor.go

+26
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package broadcast
22

33
import (
44
"context"
5+
"log/slog"
56
"time"
67

78
"google.golang.org/protobuf/reflect/protoreflect"
@@ -16,6 +17,7 @@ type BroadcastProcessor struct {
1617
cancelFunc context.CancelFunc
1718
started time.Time
1819
ended time.Time
20+
logger *slog.Logger
1921

2022
cancellationCtx context.Context
2123
cancellationCtxCancel context.CancelFunc
@@ -60,12 +62,14 @@ func (p *BroadcastProcessor) handle(msg Content) {
6062
msg.ReceiveChan <- shardResponse{
6163
err: OutOfOrderErr{},
6264
}
65+
p.log("msg: out of order", "err", OutOfOrderErr{}, "method", msg.CurrentMethod, "from", msg.SenderAddr)
6366
} else {
6467
msg.ReceiveChan <- shardResponse{
6568
err: nil,
6669
reqCtx: p.cancellationCtx,
6770
enqueueBroadcast: p.enqueueBroadcast,
6871
}
72+
p.log("msg: processed", "err", nil, "method", msg.CurrentMethod, "from", msg.SenderAddr)
6973
}
7074
}
7175
defer func() {
@@ -79,13 +83,15 @@ func (p *BroadcastProcessor) handle(msg Content) {
7983
//close(p.broadcastChan)
8084
//close(p.sendChan)
8185
p.emptyChannels(metadata)
86+
p.log("processor stopped", "err", nil, "started", p.started, "ended", p.ended)
8287
}()
8388
for {
8489
select {
8590
case <-p.ctx.Done():
8691
return
8792
case bMsg := <-p.broadcastChan:
8893
if p.broadcastID != bMsg.BroadcastID {
94+
p.log("broadcast: wrong BroadcastID", "err", BroadcastIDErr{}, "type", bMsg.MsgType.String(), "stopping", false)
8995
continue
9096
}
9197
switch bMsg.MsgType {
@@ -110,6 +116,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
110116
new.ReceiveChan <- shardResponse{
111117
err: BroadcastIDErr{},
112118
}
119+
p.log("msg: wrong BroadcastID", "err", BroadcastIDErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
113120
continue
114121
}
115122
if new.IsCancellation {
@@ -119,6 +126,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
119126
new.ReceiveChan <- shardResponse{
120127
err: nil,
121128
}
129+
p.log("msg: received cancellation", "err", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
122130
continue
123131
}
124132

@@ -129,6 +137,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
129137
new.ReceiveChan <- shardResponse{
130138
err: ClientReqAlreadyReceivedErr{},
131139
}
140+
p.log("msg: duplicate client req", "err", ClientReqAlreadyReceivedErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
132141
continue
133142
}
134143
// important to set this option to prevent duplicate client reqs.
@@ -146,6 +155,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
146155
}
147156
p.cancellationCtxCancel()
148157
}()
158+
p.log("msg: received client req", "err", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
149159
}
150160

151161
metadata.update(new)
@@ -164,6 +174,7 @@ func (p *BroadcastProcessor) handle(msg Content) {
164174
err: err,
165175
}
166176
// slog.Info("receive: late", "err", err, "id", p.broadcastID)
177+
p.log("msg: late msg", "err", err, "method", new.CurrentMethod, "from", new.SenderAddr)
167178
return
168179
}
169180
if !p.isInOrder(new.CurrentMethod) {
@@ -172,22 +183,26 @@ func (p *BroadcastProcessor) handle(msg Content) {
172183
new.ReceiveChan <- shardResponse{
173184
err: OutOfOrderErr{},
174185
}
186+
p.log("msg: out of order", "err", OutOfOrderErr{}, "method", new.CurrentMethod, "from", new.SenderAddr)
175187
continue
176188
}
177189
new.ReceiveChan <- shardResponse{
178190
err: nil,
179191
reqCtx: p.cancellationCtx,
180192
enqueueBroadcast: p.enqueueBroadcast,
181193
}
194+
p.log("msg: processed", "err", nil, "method", new.CurrentMethod, "from", new.SenderAddr)
182195
}
183196
}
184197
}
185198

186199
func (p *BroadcastProcessor) handleCancellation(bMsg Msg, metadata *metadata) bool {
187200
if bMsg.Cancellation.end {
201+
p.log("broadcast: broadcast.Done() called", "err", nil, "type", bMsg.MsgType.String(), "stopping", true)
188202
return true
189203
}
190204
if !metadata.SentCancellation {
205+
p.log("broadcast: sent cancellation", "err", nil, "type", bMsg.MsgType.String(), "stopping", false)
191206
metadata.SentCancellation = true
192207
go p.router.Send(p.broadcastID, "", "", bMsg.Cancellation)
193208
}
@@ -201,17 +216,25 @@ func (p *BroadcastProcessor) handleBroadcast(bMsg Msg, methods []string, metadat
201216
return false
202217
}
203218
p.router.Send(p.broadcastID, metadata.OriginAddr, metadata.OriginMethod, bMsg.Msg)
219+
p.log("broadcast: sending broadcast", "err", nil, "type", bMsg.MsgType.String(), "stopping", false, "isBroadcastCall", metadata.isBroadcastCall())
204220

205221
p.updateOrder(bMsg.Method)
206222
p.dispatchOutOfOrderMsgs()
207223
return true
208224
}
209225

226+
func (p *BroadcastProcessor) log(msg string, args ...any) {
227+
if p.logger != nil {
228+
p.logger.Debug(msg, args...)
229+
}
230+
}
231+
210232
func (p *BroadcastProcessor) handleReply(bMsg Msg, metadata *metadata) bool {
211233
// BroadcastCall if origin addr is non-empty.
212234
if metadata.isBroadcastCall() {
213235
go p.router.Send(p.broadcastID, metadata.OriginAddr, metadata.OriginMethod, bMsg.Reply)
214236
// the request is done becuase we have sent a reply to the client
237+
p.log("broadcast: sending reply to client", "err", nil, "type", bMsg.MsgType.String(), "stopping", true, "isBroadcastCall", metadata.isBroadcastCall())
215238
return true
216239
}
217240
// QuorumCall if origin addr is empty.
@@ -231,9 +254,11 @@ func (p *BroadcastProcessor) handleReply(bMsg Msg, metadata *metadata) bool {
231254
// the request is not done yet because we have not replied to
232255
// the client.
233256
//slog.Info("reply: late", "err", err, "id", p.broadcastID)
257+
p.log("broadcast: failed to send reply to client", "err", err, "type", bMsg.MsgType.String(), "stopping", false, "isBroadcastCall", metadata.isBroadcastCall())
234258
return false
235259
}
236260
// the request is done becuase we have sent a reply to the client
261+
p.log("broadcast: sending reply to client", "err", err, "type", bMsg.MsgType.String(), "stopping", true, "isBroadcastCall", metadata.isBroadcastCall())
237262
return true
238263
}
239264

@@ -394,6 +419,7 @@ func (r *BroadcastProcessor) dispatchOutOfOrderMsgs() {
394419
if order <= r.orderIndex {
395420
for _, msg := range msgs {
396421
msg.Run(r.cancellationCtx, r.enqueueBroadcast)
422+
r.log("msg: dispatching out of order msg", "err", nil, "method", msg.CurrentMethod, "from", msg.SenderAddr)
397423
}
398424
handledMethods = append(handledMethods, method)
399425
}

broadcast/router.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ func (r *BroadcastRouter) Send(broadcastID uint64, addr, method string, req any)
7070
r.canceler(broadcastID, val.srvAddrs)
7171
return nil
7272
}
73-
return errors.New("wrong req type")
73+
err := errors.New("wrong req type")
74+
r.log("router: malformed msg", "err", err, "BroadcastID", broadcastID)
75+
return err
7476
}
7577

7678
func (r *BroadcastRouter) Connect(addr string) {
@@ -84,21 +86,28 @@ func (r *BroadcastRouter) routeBroadcast(broadcastID uint64, addr, method string
8486
handler(msg.ctx, msg.request, broadcastID, addr, method, msg.options, r.id, r.addr)
8587
return nil
8688
}
87-
return errors.New("not found")
89+
err := errors.New("handler not found")
90+
r.log("router (broadcast): could not find handler", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
91+
return err
8892
}
8993

9094
func (r *BroadcastRouter) routeClientReply(broadcastID uint64, addr, method string, resp *reply) error {
9195
// the client has initiated a broadcast call and the reply should be sent as an RPC
9296
if _, ok := r.clientHandlers[method]; ok && addr != "" {
9397
client, err := r.getClient(addr)
9498
if err != nil {
99+
r.log("router (reply): could not get client", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
95100
return err
96101
}
97-
return client.SendMsg(broadcastID, method, resp.getResponse(), r.dialTimeout)
102+
err = client.SendMsg(broadcastID, method, resp.getResponse(), r.dialTimeout)
103+
r.log("router (reply): could not send reply", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
104+
return err
98105
}
99106
// the server can receive a broadcast from another server before a client sends a direct message.
100107
// it should thus wait for a potential message from the client. otherwise, it should be removed.
101-
return errors.New("not routed")
108+
err := errors.New("not routed")
109+
r.log("router (reply): could not find handler", "err", err, "BroadcastID", broadcastID, "addr", addr, "method", method)
110+
return err
102111
}
103112

104113
func (r *BroadcastRouter) getClient(addr string) (*Client, error) {
@@ -132,6 +141,12 @@ func (r *BroadcastRouter) getClient(addr string) (*Client, error) {
132141
return client, nil
133142
}
134143

144+
func (r *BroadcastRouter) log(msg string, args ...any) {
145+
if r.logger != nil {
146+
r.logger.Debug(msg, args...)
147+
}
148+
}
149+
135150
type msgType int
136151

137152
const (

broadcast/shard.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package broadcast
22

33
import (
44
"context"
5+
"log/slog"
56
"sync"
67
"time"
78
)
@@ -37,12 +38,13 @@ type shard struct {
3738
nextGC time.Time
3839
shardBuffer int
3940
sendBuffer int
41+
logger *slog.Logger
4042

4143
preserveOrdering bool
4244
order map[string]int
4345
}
4446

45-
func createShards(ctx context.Context, shardBuffer, sendBuffer int, router Router, order map[string]int, reqTTL time.Duration) []*shard {
47+
func createShards(ctx context.Context, shardBuffer, sendBuffer int, router Router, order map[string]int, reqTTL time.Duration, logger *slog.Logger) []*shard {
4648
shards := make([]*shard, NumShards)
4749
for i := range shards {
4850
ctx, cancel := context.WithCancel(ctx)
@@ -60,6 +62,7 @@ func createShards(ctx context.Context, shardBuffer, sendBuffer int, router Route
6062
router: router,
6163
preserveOrdering: order != nil,
6264
order: order,
65+
logger: logger,
6366
}
6467
}
6568
return shards
@@ -375,6 +378,10 @@ func (s *shard) addProcessor2(sendBuffer int, msg Content) (*BroadcastProcessor,
375378
// should only affect the current shard and not the others.
376379
ctx, cancel := context.WithTimeout(s.ctx, s.reqTTL)
377380
//req := &BroadcastRequest{
381+
var logger *slog.Logger
382+
if s.logger != nil {
383+
logger = s.logger.With(slog.Uint64("BroadcastID", msg.BroadcastID))
384+
}
378385
req := &BroadcastProcessor{
379386
ctx: ctx,
380387
cancelFunc: cancel,
@@ -395,6 +402,7 @@ func (s *shard) addProcessor2(sendBuffer int, msg Content) (*BroadcastProcessor,
395402
cancellationCtxCancel: msg.CancelCtx,
396403
executionOrder: s.order,
397404
//sendCancellation: new(sync.Once),
405+
logger: logger,
398406
}
399407
s.reqs[msg.BroadcastID] = req
400408
//go req.handle(s.router, msg.BroadcastID, msg)

broadcast/state.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewState(logger *slog.Logger, router Router, order map[string]int) *Broadca
5555
sendBuffer := 30
5656
TTL := 5 * time.Minute
5757
ctx, cancel := context.WithCancel(context.Background())
58-
shards := createShards(ctx, shardBuffer, sendBuffer, router, order, TTL)
58+
shards := createShards(ctx, shardBuffer, sendBuffer, router, order, TTL, logger)
5959
state := &BroadcastState{
6060
parentCtx: ctx,
6161
parentCtxCancelFunc: cancel,
@@ -116,7 +116,7 @@ func (s *BroadcastState) reset() {
116116
ctx, cancel := context.WithCancel(context.Background())
117117
s.parentCtx = ctx
118118
s.parentCtxCancelFunc = cancel
119-
s.shards = createShards(ctx, s.shardBuffer, s.sendBuffer, s.router, s.order, s.reqTTL)
119+
s.shards = createShards(ctx, s.shardBuffer, s.sendBuffer, s.router, s.order, s.reqTTL, s.logger)
120120
s.RunShards()
121121
for _, client := range s.clients {
122122
client.Close()
@@ -173,6 +173,7 @@ type Content struct {
173173
IsCancellation bool
174174
OriginAddr string
175175
OriginMethod string
176+
SenderAddr string
176177
CurrentMethod string
177178
ReceiveChan chan shardResponse
178179
SendFn func(resp protoreflect.ProtoMessage, err error) error

broadcastcall.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ func (bcd *BroadcastCallData) inSubset(addr string) bool {
3535
return false
3636
}
3737

38-
// BroadcastCall performs a multicast call on the configuration.
39-
func (c RawConfiguration) BroadcastCall(ctx context.Context, d BroadcastCallData) {
38+
// BroadcastCall performs a broadcast on the configuration.
39+
func (c RawConfiguration) BroadcastCall(ctx context.Context, d BroadcastCallData, opts ...CallOption) {
4040
md := &ordering.Metadata{MessageID: c.getMsgID(), Method: d.Method, BroadcastMsg: &ordering.BroadcastMsg{
4141
IsBroadcastClient: d.IsBroadcastClient,
4242
BroadcastID: d.BroadcastID,
4343
SenderAddr: d.SenderAddr,
4444
OriginAddr: d.OriginAddr,
4545
OriginMethod: d.OriginMethod,
4646
}}
47-
o := getCallOptions(E_Broadcast, nil)
47+
o := getCallOptions(E_Broadcast, opts)
4848

4949
var replyChan chan response
5050
if !o.noSendWaiting {
@@ -82,6 +82,7 @@ func (c RawConfiguration) BroadcastCall(ctx context.Context, d BroadcastCallData
8282
n.channel.enqueueSlow(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}, opts: o})
8383
}
8484

85+
// if noSendWaiting is set, we will not wait for confirmation from the channel before returning.
8586
if o.noSendWaiting {
8687
return
8788
}
@@ -90,5 +91,4 @@ func (c RawConfiguration) BroadcastCall(ctx context.Context, d BroadcastCallData
9091
for ; sentMsgs > 0; sentMsgs-- {
9192
<-replyChan
9293
}
93-
close(replyChan)
9494
}

clientserver.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (srv *ClientServer) Stop() {
6464
}
6565
}
6666

67-
func (srv *ClientServer) AddRequest(broadcastID uint64, clientCtx context.Context, in protoreflect.ProtoMessage, handler ReplySpecHandler, method string) (chan protoreflect.ProtoMessage, QuorumCallData) {
68-
cd := QuorumCallData{
67+
func (srv *ClientServer) AddRequest(broadcastID uint64, clientCtx context.Context, in protoreflect.ProtoMessage, handler ReplySpecHandler, method string) (chan protoreflect.ProtoMessage, BroadcastCallData) {
68+
cd := BroadcastCallData{
6969
Message: in,
7070
Method: method,
7171

cmd/protoc-gen-gorums/dev/zorums_broadcastcall_gorums.pb.go

+6-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)