Skip to content

Commit 1f60773

Browse files
doc(broadcast): added documentation for broadcast options
1 parent c923d79 commit 1f60773

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

broadcast.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func newBroadcastServer(serverOpts *serverOptions) *broadcastServer {
4646
logger: serverOpts.logger,
4747
machineID: serverOpts.machineID,
4848
}
49-
srv.manager = broadcast.NewBroadcastManager(serverOpts.logger, createClient, srv.canceler, serverOpts.executionOrder, serverOpts.clientDialTimeout, serverOpts.reqTTL, serverOpts.shardBuffer, serverOpts.sendBuffer)
49+
srv.manager = broadcast.NewBroadcastManager(serverOpts.logger, createClient, srv.canceler, serverOpts.executionOrder, serverOpts.clientDialTimeout, serverOpts.reqTTL, serverOpts.shardBuffer, serverOpts.sendBuffer, serverOpts.grpcDialOpts...)
5050
srv.manager.AddAddr(srv.id, serverOpts.listenAddr, srv.machineID)
5151
return srv
5252
}
@@ -112,30 +112,41 @@ func NewBroadcastOrchestrator(srv *Server) *BroadcastOrchestrator {
112112

113113
type BroadcastOption func(*broadcast.BroadcastOptions)
114114

115+
// WithSubset enables broadcasting to a subset of the servers in the view.
116+
// It has the same function as broadcast.To().
115117
func WithSubset(srvAddrs ...string) BroadcastOption {
116118
return func(b *broadcast.BroadcastOptions) {
117119
b.ServerAddresses = srvAddrs
118120
}
119121
}
120122

123+
// WithoutSelf prevents the server from broadcasting to itself.
121124
func WithoutSelf() BroadcastOption {
122125
return func(b *broadcast.BroadcastOptions) {
123126
b.SkipSelf = true
124127
}
125128
}
126129

130+
// ProgressTo allows the server to accept messages to the given method.
131+
// Should only be used if the ServerOption WithOrder() is used.
127132
func ProgressTo(method string) BroadcastOption {
128133
return func(b *broadcast.BroadcastOptions) {
129134
b.ProgressTo = method
130135
}
131136
}
132137

138+
// AllowDuplication allows the server to broadcast more than once
139+
// to the same RPC method for a particular broadcast request.
133140
func AllowDuplication() BroadcastOption {
134141
return func(b *broadcast.BroadcastOptions) {
135142
b.AllowDuplication = true
136143
}
137144
}
138145

146+
// WithRelationToRequest allows for broadcasting outside a
147+
// server handler related to a specific broadcastID.
148+
// It is not recommended to use this method. Use the broadcast
149+
// struct provided with a broadcast request instead.
139150
func WithRelationToRequest(broadcastID uint64) BroadcastOption {
140151
return func(b *broadcast.BroadcastOptions) {
141152
b.RelatedToReq = broadcastID

broadcast/manager.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ type manager struct {
2828
logger *slog.Logger
2929
}
3030

31-
func NewBroadcastManager(logger *slog.Logger, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), canceler func(broadcastID uint64, srvAddrs []string), order map[string]int, dialTimeout, reqTTL time.Duration, shardBuffer, sendBuffer int) Manager {
32-
router := NewRouter(logger, createClient, canceler, dialTimeout)
31+
func NewBroadcastManager(logger *slog.Logger, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), canceler func(broadcastID uint64, srvAddrs []string), order map[string]int, dialTimeout, reqTTL time.Duration, shardBuffer, sendBuffer int, dialOpts ...grpc.DialOption) Manager {
32+
router := NewRouter(logger, createClient, canceler, dialTimeout, dialOpts...)
3333
state := NewState(logger, router, order, reqTTL, shardBuffer, sendBuffer)
3434
router.registerState(state)
3535
return &manager{

clientserver.go

-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,6 @@ func (srv *ClientServer) verify(req *Message) error {
306306

307307
func createClient(addr string, dialOpts []grpc.DialOption) (*broadcast.Client, error) {
308308
// necessary to ensure correct marshalling and unmarshalling of gorums messages
309-
// TODO: find a better solution
310309
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.CallContentSubtype(ContentSubtype)))
311310
opts := newManagerOptions()
312311
opts.grpcDialOpts = dialOpts

server.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,10 @@ type serverOptions struct {
180180
// running in a docker container it is useful to listen to the loopback
181181
// address and use forwarding from the host. If not this option is not given,
182182
// the listen address used on the gRPC listener will be used instead.
183-
listenAddr string
184-
allowList map[string]string
185-
auth *authentication.EllipticCurve
183+
listenAddr string
184+
allowList map[string]string
185+
auth *authentication.EllipticCurve
186+
grpcDialOpts []grpc.DialOption
186187
}
187188

188189
// ServerOption is used to change settings for the GorumsServer
@@ -244,6 +245,15 @@ func WithClientDialTimeout(dialTimeout time.Duration) ServerOption {
244245
}
245246
}
246247

248+
// WithServerGrpcDialOptions returns a ServerOption which sets any gRPC dial options
249+
// the Broadcast Router should use when connecting to each client.
250+
func WithServerGrpcDialOptions(opts ...grpc.DialOption) ServerOption {
251+
return func(o *serverOptions) {
252+
o.grpcDialOpts = make([]grpc.DialOption, 0, len(opts))
253+
o.grpcDialOpts = append(o.grpcDialOpts, opts...)
254+
}
255+
}
256+
247257
// WithShardBuffer returns a ServerOption which sets the buffer size
248258
// of the shards. A higher shard size uses more memory but saves time
249259
// when the number of broadcast requests is large.

0 commit comments

Comments
 (0)