Skip to content

Commit 5c68674

Browse files
feat(broadcast): added logging to clientserver
1 parent eb87024 commit 5c68674

File tree

6 files changed

+48
-15
lines changed

6 files changed

+48
-15
lines changed

channel.go

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gorums
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"math"
78
"math/rand"
89
"sync"
@@ -237,6 +238,7 @@ func (c *channel) sendMsg(req request) (err error) {
237238

238239
err = c.gorumsStream.SendMsg(req.msg)
239240
if err != nil {
241+
slog.Error("channel: couldnt send msg", "err", err)
240242
c.setLastErr(err)
241243
c.streamBroken.set()
242244
}

clientserver.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gorums
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"net"
78
"sync"
89
"time"
@@ -43,6 +44,7 @@ type csr struct {
4344
}
4445

4546
type ClientServer struct {
47+
id uint64 // should correpond to the ID given to the manager
4648
mu sync.Mutex
4749
csr map[uint64]*csr
4850
reqChan chan *ClientRequest
@@ -52,10 +54,14 @@ type ClientServer struct {
5254
inProgress uint64
5355
grpcServer *grpc.Server
5456
handlers map[string]requestHandler
57+
logger *slog.Logger
5558
ordering.UnimplementedGorumsServer
5659
}
5760

5861
func (srv *ClientServer) Stop() {
62+
if srv.logger != nil {
63+
srv.logger.Info("clientserver: stopped")
64+
}
5965
if srv.cancelCtx != nil {
6066
srv.cancelCtx()
6167
}
@@ -85,12 +91,16 @@ func (srv *ClientServer) AddRequest(broadcastID uint64, clientCtx context.Contex
8591
}
8692
srv.mu.Unlock()
8793

88-
go createReq(ctx, clientCtx, cancel, in, doneChan, respChan, handler)
94+
var logger *slog.Logger
95+
if srv.logger != nil {
96+
logger = srv.logger.With(slog.Uint64("BroadcastID", broadcastID))
97+
}
98+
go createReq(ctx, clientCtx, cancel, in, doneChan, respChan, handler, logger)
8999

90100
return doneChan, cd
91101
}
92102

93-
func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req protoreflect.ProtoMessage, doneChan chan protoreflect.ProtoMessage, respChan chan protoreflect.ProtoMessage, handler ReplySpecHandler) {
103+
func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req protoreflect.ProtoMessage, doneChan chan protoreflect.ProtoMessage, respChan chan protoreflect.ProtoMessage, handler ReplySpecHandler, logger *slog.Logger) {
94104
// make sure to cancel the req ctx when returning to
95105
// prevent a leaking ctx.
96106
defer cancel()
@@ -99,6 +109,9 @@ func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req pr
99109
select {
100110
case <-clientCtx.Done():
101111
// client provided ctx
112+
if logger != nil {
113+
logger.Warn("clientserver: stopped by client")
114+
}
102115
return
103116
case <-ctx.Done():
104117
// request ctx. this is a child to the server ctx.
@@ -110,6 +123,9 @@ func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req pr
110123
// chooses not to timeout the request and the server
111124
// goes down.
112125
close(doneChan)
126+
if logger != nil {
127+
logger.Warn("clientserver: stopped by server")
128+
}
113129
return
114130
case resp := <-respChan:
115131
// keep track of all responses thus far
@@ -122,6 +138,9 @@ func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req pr
122138
case <-ctx.Done():
123139
case <-clientCtx.Done():
124140
}
141+
if logger != nil {
142+
logger.Info("clientserver: req done")
143+
}
125144
close(doneChan)
126145
return
127146
}
@@ -141,6 +160,9 @@ func (srv *ClientServer) AddResponse(ctx context.Context, resp protoreflect.Prot
141160
if !ok {
142161
return fmt.Errorf("doesn't exist")
143162
}
163+
if srv.logger != nil {
164+
srv.logger.Info("clientserver: got a reply", "BroadcastID", broadcastID)
165+
}
144166
select {
145167
case <-ctx.Done():
146168
return ctx.Err()
@@ -191,13 +213,19 @@ func NewClientServer(lis net.Listener, opts ...ServerOption) *ClientServer {
191213
for _, opt := range opts {
192214
opt(&serverOpts)
193215
}
216+
var logger *slog.Logger
217+
if serverOpts.logger != nil {
218+
logger = serverOpts.logger.With(slog.Uint64("ClientID", serverOpts.machineID))
219+
}
194220
ctx, cancel := context.WithCancel(context.Background())
195221
srv := &ClientServer{
222+
id: serverOpts.machineID,
196223
ctx: ctx,
197224
cancelCtx: cancel,
198225
csr: make(map[uint64]*csr),
199226
grpcServer: grpc.NewServer(serverOpts.grpcOpts...),
200227
handlers: make(map[string]requestHandler),
228+
logger: logger,
201229
}
202230
ordering.RegisterGorumsServer(srv.grpcServer, srv)
203231
srv.lis = lis

cmd/protoc-gen-gorums/dev/mgr.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net"
66

77
"github.com/relab/gorums"
8-
grpc "google.golang.org/grpc"
98
"google.golang.org/grpc/encoding"
109
)
1110

@@ -40,8 +39,8 @@ func (mgr *Manager) Close() {
4039
}
4140
}
4241

43-
func (mgr *Manager) AddClientServer(lis net.Listener, opts ...grpc.ServerOption) error {
44-
srv := gorums.NewClientServer(lis)
42+
func (mgr *Manager) AddClientServer(lis net.Listener, opts ...gorums.ServerOption) error {
43+
srv := gorums.NewClientServer(lis, opts...)
4544
srvImpl := &clientServerImpl{
4645
ClientServer: srv,
4746
}

cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/protoc-gen-gorums/gengorums/template_static.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,6 @@ type serverOptions struct {
115115
machineID uint64
116116
}
117117

118-
var defaultServerOptions = serverOptions{
119-
// Provide an illegal machineID to avoid unintentional collisions.
120-
// 0 is a valid MachineID and should not be used as default.
121-
machineID: uint64(broadcast.MaxMachineID) + 1,
122-
}
123-
124118
// ServerOption is used to change settings for the GorumsServer
125119
type ServerOption func(*serverOptions)
126120

@@ -164,6 +158,12 @@ func WithSLogger(logger *slog.Logger) ServerOption {
164158
}
165159
}
166160

161+
func WithSrvID(machineID uint64) ServerOption {
162+
return func(o *serverOptions) {
163+
o.machineID = machineID
164+
}
165+
}
166+
167167
// Server serves all ordering based RPCs using registered handlers.
168168
type Server struct {
169169
srv *orderingServer
@@ -175,7 +175,11 @@ type Server struct {
175175
// This function is intended for internal Gorums use.
176176
// You should call `NewServer` in the generated code instead.
177177
func NewServer(opts ...ServerOption) *Server {
178-
serverOpts := defaultServerOptions
178+
serverOpts := serverOptions{
179+
// Provide an illegal machineID to avoid unintentional collisions.
180+
// 0 is a valid MachineID and should not be used as default.
181+
machineID: uint64(broadcast.MaxMachineID) + 1,
182+
}
179183
for _, opt := range opts {
180184
opt(&serverOpts)
181185
}

0 commit comments

Comments
 (0)