Skip to content

Commit c88e88a

Browse files
fix(broadcast): client sends cancellation upon timeouts
1 parent ef48e27 commit c88e88a

13 files changed

+170
-105
lines changed

broadcast/shard_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func TestShard(t *testing.T) {
6161
OriginMethod: "testMethod",
6262
IsBroadcastClient: true,
6363
ReceiveChan: make(chan shardResponse),
64+
Ctx: context.Background(),
6465
},
6566
out: nil,
6667
},
@@ -71,6 +72,7 @@ func TestShard(t *testing.T) {
7172
OriginMethod: "testMethod",
7273
IsBroadcastClient: false,
7374
ReceiveChan: make(chan shardResponse),
75+
Ctx: context.Background(),
7476
},
7577
out: nil,
7678
},
@@ -81,6 +83,7 @@ func TestShard(t *testing.T) {
8183
OriginMethod: "testMethod",
8284
IsBroadcastClient: false,
8385
ReceiveChan: make(chan shardResponse),
86+
Ctx: context.Background(),
8487
},
8588
out: nil,
8689
},

broadcastCall.go broadcastcall.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
"google.golang.org/protobuf/reflect/protoreflect"
88
)
99

10-
// broadcastCallData holds the message, destination nodes, method identifier,
10+
// BroadcastCallData holds the message, destination nodes, method identifier,
1111
// and other information necessary to perform the various quorum call types
1212
// supported by Gorums.
13-
type broadcastCallData struct {
13+
type BroadcastCallData struct {
1414
Message protoreflect.ProtoMessage
1515
Method string
1616
BroadcastID uint64 // a unique identifier for the current broadcast request
@@ -23,7 +23,7 @@ type broadcastCallData struct {
2323

2424
// checks whether the given address is contained in the given subset
2525
// of server addresses. Will return true if a subset is not given.
26-
func (bcd *broadcastCallData) inSubset(addr string) bool {
26+
func (bcd *BroadcastCallData) inSubset(addr string) bool {
2727
if bcd.ServerAddresses == nil || len(bcd.ServerAddresses) <= 0 {
2828
return true
2929
}
@@ -35,8 +35,8 @@ func (bcd *broadcastCallData) inSubset(addr string) bool {
3535
return false
3636
}
3737

38-
// broadcastCall performs a multicast call on the configuration.
39-
func (c RawConfiguration) broadcastCall(ctx context.Context, d broadcastCallData) {
38+
// BroadcastCall performs a multicast call on the configuration.
39+
func (c RawConfiguration) BroadcastCall(ctx context.Context, d BroadcastCallData) {
4040
md := &ordering.Metadata{MessageID: c.getMsgID(), Method: d.Method, BroadcastMsg: &ordering.BroadcastMsg{
4141
IsBroadcastClient: d.IsBroadcastClient,
4242
BroadcastID: d.BroadcastID,

clientserver.go

-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req pr
9999
select {
100100
case <-clientCtx.Done():
101101
// client provided ctx
102-
//slog.Info("clientserver: clientCtx done", "resps", len(resps))
103102
return
104103
case <-ctx.Done():
105104
// request ctx. this is a child to the server ctx.

cmd/protoc-gen-gorums/dev/zorums_broadcastcall_gorums.pb.go

+21-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/protoc-gen-gorums/gengorums/template_broadcastcall.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,19 @@ func (c *Configuration) {{.Method.GoName}}(ctx context.Context, in *{{in .GenFil
2323
if c.qspec == nil {
2424
return nil, fmt.Errorf("a qspec is not defined")
2525
}
26-
doneChan, cd := c.srv.AddRequest(c.snowflake.NewBroadcastID(), ctx, in, gorums.ConvertToType(c.qspec.{{.Method.GoName}}QF), "{{.Method.Desc.FullName}}")
26+
broadcastID := c.snowflake.NewBroadcastID()
27+
doneChan, cd := c.srv.AddRequest(broadcastID, ctx, in, gorums.ConvertToType(c.qspec.{{.Method.GoName}}QF), "{{.Method.Desc.FullName}}")
2728
c.RawConfiguration.Multicast(ctx, cd, gorums.WithNoSendWaiting())
2829
var response {{$protoMessage}}
2930
var ok bool
3031
select {
3132
case response, ok = <-doneChan:
3233
case <-ctx.Done():
34+
bd := gorums.BroadcastCallData{
35+
Method: gorums.Cancellation,
36+
BroadcastID: broadcastID,
37+
}
38+
c.RawConfiguration.BroadcastCall(context.Background(), bd)
3339
return nil, fmt.Errorf("context cancelled")
3440
}
3541
if !ok {

handler.go

+8-9
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (srv *broadcastServer) sendToClientHandler(broadcastID uint64, resp protore
132132
}
133133

134134
func (srv *broadcastServer) forwardHandler(req RequestTypes, method string, broadcastID uint64, forwardAddr, originAddr string) {
135-
cd := broadcastCallData{
135+
cd := BroadcastCallData{
136136
Message: req,
137137
Method: method,
138138
BroadcastID: broadcastID,
@@ -142,7 +142,7 @@ func (srv *broadcastServer) forwardHandler(req RequestTypes, method string, broa
142142
}
143143
srv.viewMutex.RLock()
144144
// drop request if a view change has occured
145-
srv.view.broadcastCall(context.Background(), cd)
145+
srv.view.BroadcastCall(context.Background(), cd)
146146
srv.viewMutex.RUnlock()
147147
}
148148

@@ -155,21 +155,20 @@ func (srv *broadcastServer) doneHandler(broadcastID uint64) {
155155
}
156156

157157
func (srv *broadcastServer) canceler(broadcastID uint64, srvAddrs []string) {
158-
cd := broadcastCallData{
159-
//Message: &emptypb.Empty{},
158+
cd := BroadcastCallData{
160159
Message: nil,
161160
Method: Cancellation,
162161
BroadcastID: broadcastID,
163162
ServerAddresses: srvAddrs,
164163
}
165164
srv.viewMutex.RLock()
166165
// drop request if a view change has occured
167-
srv.view.broadcastCall(context.Background(), cd)
166+
srv.view.BroadcastCall(context.Background(), cd)
168167
srv.viewMutex.RUnlock()
169168
}
170169

171170
func (srv *broadcastServer) serverBroadcastHandler(method string, req RequestTypes, opts ...broadcast.BroadcastOptions) {
172-
cd := broadcastCallData{
171+
cd := BroadcastCallData{
173172
Message: req,
174173
Method: method,
175174
BroadcastID: srv.manager.NewBroadcastID(),
@@ -178,7 +177,7 @@ func (srv *broadcastServer) serverBroadcastHandler(method string, req RequestTyp
178177
}
179178
srv.viewMutex.RLock()
180179
// drop request if a view change has occured
181-
srv.view.broadcastCall(context.Background(), cd)
180+
srv.view.BroadcastCall(context.Background(), cd)
182181
srv.viewMutex.RUnlock()
183182
}
184183

@@ -188,7 +187,7 @@ func (srv *Server) SendToClientHandler(resp protoreflect.ProtoMessage, err error
188187

189188
func (srv *broadcastServer) registerBroadcastFunc(method string) {
190189
srv.manager.AddHandler(method, broadcast.ServerHandler(func(ctx context.Context, in protoreflect.ProtoMessage, broadcastID uint64, originAddr, originMethod string, options broadcast.BroadcastOptions, id uint32, addr string) {
191-
cd := broadcastCallData{
190+
cd := BroadcastCallData{
192191
Message: in,
193192
Method: method,
194193
BroadcastID: broadcastID,
@@ -200,7 +199,7 @@ func (srv *broadcastServer) registerBroadcastFunc(method string) {
200199
}
201200
srv.viewMutex.RLock()
202201
// drop request if a view change has occured
203-
srv.view.broadcastCall(ctx, cd)
202+
srv.view.BroadcastCall(ctx, cd)
204203
srv.viewMutex.RUnlock()
205204
}))
206205
/*srv.manager.AddServerHandler(method, func(ctx context.Context, in protoreflect.ProtoMessage, broadcastID uint64, originAddr, originMethod string, options broadcast.BroadcastOptions, id uint32, addr string) {

multicast.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (c RawConfiguration) Multicast(ctx context.Context, d QuorumCallData, opts
2929
continue // don't send if no msg
3030
}
3131
}
32-
go n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}, opts: o}, replyChan, false)
32+
n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: msg}, opts: o}, replyChan, false)
3333
sentMsgs++
3434
}
3535

node.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ func (n *RawNode) connect(mgr *RawManager) error {
6868
return nil
6969
}
7070
n.channel = newChannel(n)
71-
//if err := n.channel.connect(); err != nil {
72-
// return nodeError{nodeID: n.id, cause: err}
73-
//}
71+
if err := n.channel.connect(); err != nil {
72+
return nodeError{nodeID: n.id, cause: err}
73+
}
7474
return nil
7575
}
7676

tests/broadcast/broadcast_gorums.pb.go

+43-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)