Skip to content

Commit 4715dc9

Browse files
refactor(broadcast): removed async option
the async option is redundant in broadcast because running synchronously can easily cause a deadlock
1 parent 344a6d0 commit 4715dc9

16 files changed

+497
-345
lines changed

broadcast.go

+7-10
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type broadcastServer struct {
2424
timeout time.Duration
2525
clientReqs *RequestMap
2626
stopChan chan struct{}
27-
async bool
2827
logger *slog.Logger
2928
}
3029

@@ -43,6 +42,8 @@ func newBroadcastServer(logger *slog.Logger) *broadcastServer {
4342
}
4443
}
4544

45+
func (srv *broadcastServer) stop() {}
46+
4647
func (srv *broadcastServer) alreadyBroadcasted(broadcastID string, method string) bool {
4748
_, ok := srv.broadcastedMsgs[broadcastID]
4849
if !ok {
@@ -57,8 +58,9 @@ func (srv *broadcastServer) alreadyBroadcasted(broadcastID string, method string
5758

5859
func (srv *broadcastServer) run() {
5960
for msg := range srv.broadcastChan {
60-
if handler, ok := srv.handlers[msg.method]; ok {
61-
handler(msg.ctx, msg.request, msg.metadata, msg.srvAddrs)
61+
if broadcastCall, ok := srv.handlers[msg.method]; ok {
62+
// it runs a interceptor prior to broadcastCall, hence a different signature.
63+
broadcastCall(msg.ctx, msg.request, msg.metadata, msg.srvAddrs)
6264
}
6365
msg.setFinished()
6466
}
@@ -106,11 +108,7 @@ func (srv *broadcastServer) handle(response *responseMsg) {
106108
}
107109
}
108110

109-
func (srv *broadcastServer) clientReturn(resp ResponseTypes, err error, metadata BroadcastMetadata) {
110-
srv.returnToClient(metadata.BroadcastID, resp, err)
111-
}
112-
113-
func (srv *broadcastServer) returnToClient(broadcastID string, resp ResponseTypes, err error) {
111+
func (srv *broadcastServer) sendToClient(broadcastID string, resp ResponseTypes, err error) {
114112
if !srv.alreadyReturnedToClient(broadcastID) {
115113
srv.responseChan <- newResponseMessage(resp, err, broadcastID)
116114
}
@@ -121,12 +119,11 @@ func (srv *broadcastServer) alreadyReturnedToClient(broadcastID string) bool {
121119
}
122120

123121
func (srv *broadcastServer) addClientRequest(metadata *ordering.Metadata, ctx ServerCtx, finished chan<- *Message) {
124-
done := make(chan struct{})
125122
srv.clientReqs.Add(metadata.BroadcastMsg.GetBroadcastID(), clientRequest{
126123
id: uuid.New().String(),
127124
ctx: ctx,
128125
finished: finished,
129126
metadata: metadata,
130-
doneChan: done,
127+
doneChan: make(chan struct{}),
131128
})
132129
}

broadcastTypes.go

+26-15
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type ResponseTypes interface {
3232
}
3333

3434
type BroadcastHandlerFunc func(method string, req RequestTypes, metadata BroadcastMetadata, data ...BroadcastOptions)
35-
type BroadcastReturnToClientHandlerFunc func(resp ResponseTypes, err error, metadata BroadcastMetadata)
35+
type BroadcastSendToClientHandlerFunc func(broadcastID string, resp ResponseTypes, err error)
3636

3737
type defaultImplementationFunc[T RequestTypes, V ResponseTypes] func(ServerCtx, T) (V, error)
3838

@@ -84,13 +84,23 @@ type clientRequest struct {
8484
doneChan chan struct{}
8585
}
8686

87-
type SpBroadcast struct {
88-
BroadcastHandler BroadcastHandlerFunc
89-
ReturnToClientHandler BroadcastReturnToClientHandlerFunc
87+
// The BroadcastOrchestrator is used as a container for all
88+
// broadcast handlers. The BroadcastHandler takes in a method
89+
// and schedules it for broadcasting. SendToClientHandler works
90+
// similarly but it sends the message to the calling client.
91+
//
92+
// It is necessary to use an orchestrator to hide certain
93+
// implementation details, such as internal methods on the
94+
// broadcaster struct. The BroadcastOrchestrator will thus
95+
// be an unimported field in the broadcaster struct in the
96+
// generated code.
97+
type BroadcastOrchestrator struct {
98+
BroadcastHandler BroadcastHandlerFunc
99+
SendToClientHandler BroadcastSendToClientHandlerFunc
90100
}
91101

92-
func NewSpBroadcastStruct() *SpBroadcast {
93-
return &SpBroadcast{}
102+
func NewBroadcastOrchestrator() *BroadcastOrchestrator {
103+
return &BroadcastOrchestrator{}
94104
}
95105

96106
type BroadcastOption func(*BroadcastOptions)
@@ -132,12 +142,6 @@ func WithoutUniquenessChecks() BroadcastOption {
132142
}
133143
}
134144

135-
// not sure if this is necessary because the implementer
136-
// can decide to run the broadcast in a go routine.
137-
func WithoutWaiting() BroadcastOption {
138-
return func(b *BroadcastOptions) {}
139-
}
140-
141145
// returns a listener for the given address.
142146
// panics upon errors.
143147
func WithListener(listenAddr string) net.Listener {
@@ -162,22 +166,29 @@ func NewBroadcastOptions() BroadcastOptions {
162166
}
163167

164168
type broadcaster interface {
165-
setMetadataHandler(func(metadata BroadcastMetadata))
169+
setMetadataHandler(func(metadata BroadcastMetadata), func())
166170
setMetadata(metadata BroadcastMetadata)
171+
resetMetadata()
167172
}
168173

169174
type Broadcaster struct {
170-
metadataHandler func(metadata BroadcastMetadata)
175+
metadataHandler func(metadata BroadcastMetadata)
176+
resetMetadataHandler func()
171177
}
172178

173-
func (b *Broadcaster) setMetadataHandler(handler func(metadata BroadcastMetadata)) {
179+
func (b *Broadcaster) setMetadataHandler(handler func(metadata BroadcastMetadata), resetHandler func()) {
174180
b.metadataHandler = handler
181+
b.resetMetadataHandler = resetHandler
175182
}
176183

177184
func (b *Broadcaster) setMetadata(metadata BroadcastMetadata) {
178185
b.metadataHandler(metadata)
179186
}
180187

188+
func (b *Broadcaster) resetMetadata() {
189+
b.resetMetadataHandler()
190+
}
191+
181192
func NewBroadcaster() *Broadcaster {
182193
return &Broadcaster{}
183194
}

cmd/protoc-gen-gorums/dev/mgr.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func NewManager(opts ...gorums.ManagerOption) (mgr *Manager) {
3737
// A new configuration can also be created from an existing configuration,
3838
// using the And, WithNewNodes, Except, and WithoutNodes methods.
3939
func (m *Manager) NewConfiguration(opts ...gorums.ConfigOption) (c *Configuration, err error) {
40-
if len(opts) < 1 || len(opts) > 2 {
40+
if len(opts) < 1 || len(opts) > 3 {
4141
return nil, fmt.Errorf("wrong number of options: %d", len(opts))
4242
}
4343
c = &Configuration{}
@@ -62,10 +62,10 @@ func (m *Manager) NewConfiguration(opts ...gorums.ConfigOption) (c *Configuratio
6262
}
6363
}
6464
// return an error if the QuorumSpec interface is not empty and no implementation was provided.
65-
var test interface{} = struct{}{}
66-
if _, empty := test.(QuorumSpec); !empty && c.qspec == nil {
67-
return nil, fmt.Errorf("missing required QuorumSpec")
68-
}
65+
//var test interface{} = struct{}{}
66+
//if _, empty := test.(QuorumSpec); !empty && c.qspec == nil {
67+
// return nil, fmt.Errorf("missing required QuorumSpec")
68+
//}
6969
return c, nil
7070
}
7171

cmd/protoc-gen-gorums/dev/server.go

+28-13
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,27 @@ import (
55

66
"github.com/relab/gorums"
77
grpc "google.golang.org/grpc"
8+
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
89
)
910

1011
type Server struct {
1112
*gorums.Server
12-
View *Configuration
13+
broadcast *Broadcast
14+
View *Configuration
1315
}
1416

1517
func NewServer() *Server {
1618
srv := &Server{
1719
Server: gorums.NewServer(),
1820
}
1921
b := &Broadcast{
20-
Broadcaster: gorums.NewBroadcaster(),
21-
sp: gorums.NewSpBroadcastStruct(),
22+
Broadcaster: gorums.NewBroadcaster(),
23+
orchestrator: gorums.NewBroadcastOrchestrator(),
24+
metadata: gorums.BroadcastMetadata{},
2225
}
23-
srv.RegisterBroadcastStruct(b, configureHandlers(b), configureMetadata(b))
26+
srv.broadcast = b
27+
set, reset := configureMetadata(b)
28+
srv.RegisterBroadcaster(b, configureHandlers(b), set, reset)
2429
return srv
2530
}
2631

@@ -32,21 +37,23 @@ func (srv *Server) SetView(config *Configuration) {
3237

3338
type Broadcast struct {
3439
*gorums.Broadcaster
35-
sp *gorums.SpBroadcast
36-
metadata gorums.BroadcastMetadata
40+
orchestrator *gorums.BroadcastOrchestrator
41+
metadata gorums.BroadcastMetadata
3742
}
3843

39-
func configureHandlers(b *Broadcast) func(bh gorums.BroadcastHandlerFunc, ch gorums.BroadcastReturnToClientHandlerFunc) {
40-
return func(bh gorums.BroadcastHandlerFunc, ch gorums.BroadcastReturnToClientHandlerFunc) {
41-
b.sp.BroadcastHandler = bh
42-
b.sp.ReturnToClientHandler = ch
44+
func configureHandlers(b *Broadcast) func(bh gorums.BroadcastHandlerFunc, ch gorums.BroadcastSendToClientHandlerFunc) {
45+
return func(bh gorums.BroadcastHandlerFunc, ch gorums.BroadcastSendToClientHandlerFunc) {
46+
b.orchestrator.BroadcastHandler = bh
47+
b.orchestrator.SendToClientHandler = ch
4348
}
4449
}
4550

46-
func configureMetadata(b *Broadcast) func(metadata gorums.BroadcastMetadata) {
51+
func configureMetadata(b *Broadcast) (func(metadata gorums.BroadcastMetadata), func()) {
4752
return func(metadata gorums.BroadcastMetadata) {
48-
b.metadata = metadata
49-
}
53+
b.metadata = metadata
54+
}, func() {
55+
b.metadata = gorums.BroadcastMetadata{}
56+
}
5057
}
5158

5259
// Returns a readonly struct of the metadata used in the broadcast.
@@ -76,3 +83,11 @@ func (c *Configuration) RegisterClientServer(lis net.Listener, opts ...grpc.Serv
7683
c.srv = srvImpl
7784
return nil
7885
}
86+
87+
func (b *Broadcast) SendToClient(resp protoreflect.ProtoMessage, err error) {
88+
b.orchestrator.SendToClientHandler(b.metadata.BroadcastID, resp, err)
89+
}
90+
91+
func (srv *Server) SendToClient(resp protoreflect.ProtoMessage, err error, broadcastID string) {
92+
srv.RetToClient(resp, err, broadcastID)
93+
}

0 commit comments

Comments
 (0)