Skip to content

Commit f8f71f9

Browse files
fix(broadcast): better error handling and option for machineID
1 parent d128be2 commit f8f71f9

14 files changed

+101
-36
lines changed

broadcast.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type broadcastServer struct {
1818
viewMutex sync.RWMutex
1919
id uint32
2020
addr string
21+
machineID uint64
2122
view RawConfiguration
2223
createBroadcaster func(m BroadcastMetadata, o *BroadcastOrchestrator, b EnqueueBroadcast) Broadcaster
2324
orchestrator *BroadcastOrchestrator
@@ -29,9 +30,10 @@ func (srv *Server) GetStats() broadcast.Metrics {
2930
return srv.broadcastSrv.manager.GetStats()
3031
}
3132

32-
func newBroadcastServer(logger *slog.Logger, order map[string]int) *broadcastServer {
33+
func newBroadcastServer(logger *slog.Logger, order map[string]int, machineID uint64) *broadcastServer {
3334
srv := &broadcastServer{
34-
logger: logger,
35+
logger: logger,
36+
machineID: machineID,
3537
}
3638
srv.manager = broadcast.NewBroadcastManager(logger, createClient, srv.canceler, order)
3739
return srv
@@ -59,7 +61,7 @@ func (srv *broadcastServer) addAddr(addr string) {
5961
h := fnv.New32a()
6062
_, _ = h.Write([]byte(srv.addr))
6163
srv.id = h.Sum32()
62-
srv.manager.AddAddr(srv.id, srv.addr)
64+
srv.manager.AddAddr(srv.id, srv.addr, srv.machineID)
6365
}
6466

6567
const (

broadcast/manager.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type Manager interface {
1616
Cancel(uint64, []string) error
1717
Done(uint64)
1818
NewBroadcastID() uint64
19-
AddAddr(id uint32, addr string)
19+
AddAddr(id uint32, addr string, machineID uint64)
2020
AddHandler(method string, handler any)
2121
Close() error
2222
ResetState()
@@ -149,10 +149,10 @@ func (mgr *manager) NewBroadcastID() uint64 {
149149
return mgr.state.snowflake.NewBroadcastID()
150150
}
151151

152-
func (mgr *manager) AddAddr(id uint32, addr string) {
152+
func (mgr *manager) AddAddr(id uint32, addr string, machineID uint64) {
153153
mgr.router.id = id
154154
mgr.router.addr = addr
155-
mgr.state.snowflake = NewSnowflake(addr)
155+
mgr.state.snowflake = NewSnowflake(machineID)
156156
}
157157

158158
func (mgr *manager) AddHandler(method string, handler any) {

broadcast/processor.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type metadata struct {
3131
Sent bool
3232
ResponseMsg protoreflect.ProtoMessage
3333
ResponseErr error
34-
SendFn func(protoreflect.ProtoMessage, error)
34+
SendFn func(protoreflect.ProtoMessage, error) error
3535
IsBroadcastClient bool
3636
}
3737

@@ -188,7 +188,10 @@ func (m *metadata) send(resp protoreflect.ProtoMessage, err error) error {
188188
if !m.hasReceivedClientRequest() {
189189
return MissingClientReqErr{}
190190
}
191-
m.SendFn(resp, err)
191+
// error is intentionally ignored. We have not setup retry logic for failed
192+
// deliveries to clients. Responding with nil will stop the broadcast request
193+
// which is needed to prevent many stale goroutines.
194+
_ = m.SendFn(resp, err)
192195
return nil
193196
}
194197

broadcast/processor_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (r *mockRouter) Send(broadcastID uint64, addr, method string, req any) erro
3737
func (r *mockRouter) Connect(addr string) {}*/
3838

3939
func TestHandleBroadcastOption1(t *testing.T) {
40-
snowflake := NewSnowflake("127.0.0.1:8080")
40+
snowflake := NewSnowflake(0)
4141
broadcastID := snowflake.NewBroadcastID()
4242

4343
var tests = []struct {
@@ -127,7 +127,7 @@ func TestHandleBroadcastOption1(t *testing.T) {
127127
clientMsg := Content{
128128
BroadcastID: broadcastID,
129129
IsBroadcastClient: true,
130-
SendFn: func(resp protoreflect.ProtoMessage, err error) {},
130+
SendFn: func(resp protoreflect.ProtoMessage, err error) error { return nil },
131131
ReceiveChan: make(chan shardResponse),
132132
}
133133
req.sendChan <- clientMsg
@@ -145,7 +145,7 @@ func TestHandleBroadcastOption1(t *testing.T) {
145145
}
146146

147147
func TestHandleBroadcastCall1(t *testing.T) {
148-
snowflake := NewSnowflake("127.0.0.1:8080")
148+
snowflake := NewSnowflake(0)
149149
broadcastID := snowflake.NewBroadcastID()
150150

151151
var tests = []struct {
@@ -249,7 +249,7 @@ func TestHandleBroadcastCall1(t *testing.T) {
249249
}
250250

251251
func BenchmarkHandle1(b *testing.B) {
252-
snowflake := NewSnowflake("127.0.0.1:8080")
252+
snowflake := NewSnowflake(0)
253253
originMethod := "testMethod"
254254
router := &mockRouter{
255255
returnError: false,
@@ -264,7 +264,7 @@ func BenchmarkHandle1(b *testing.B) {
264264
},
265265
BroadcastID: broadcastID,
266266
}
267-
sendFn := func(resp protoreflect.ProtoMessage, err error) {}
267+
sendFn := func(resp protoreflect.ProtoMessage, err error) error { return nil }
268268

269269
b.ResetTimer()
270270
b.Run("RequestHandler", func(b *testing.B) {

broadcast/request_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (r *mockRouter) Send(broadcastID uint64, addr, method string, req any) erro
3838
func (r *mockRouter) Connect(addr string) {}
3939

4040
func TestHandleBroadcastOption(t *testing.T) {
41-
snowflake := NewSnowflake("127.0.0.1:8080")
41+
snowflake := NewSnowflake(0)
4242
broadcastID := snowflake.NewBroadcastID()
4343

4444
var tests = []struct {
@@ -127,7 +127,7 @@ func TestHandleBroadcastOption(t *testing.T) {
127127
clientMsg := Content{
128128
BroadcastID: broadcastID,
129129
IsBroadcastClient: true,
130-
SendFn: func(resp protoreflect.ProtoMessage, err error) {},
130+
SendFn: func(resp protoreflect.ProtoMessage, err error) error { return nil },
131131
ReceiveChan: make(chan shardResponse),
132132
}
133133
req.sendChan <- clientMsg
@@ -145,7 +145,7 @@ func TestHandleBroadcastOption(t *testing.T) {
145145
}
146146

147147
func TestHandleBroadcastCall(t *testing.T) {
148-
snowflake := NewSnowflake("127.0.0.1:8080")
148+
snowflake := NewSnowflake(0)
149149
broadcastID := snowflake.NewBroadcastID()
150150

151151
var tests = []struct {
@@ -248,7 +248,7 @@ func TestHandleBroadcastCall(t *testing.T) {
248248
}
249249

250250
func BenchmarkHandle2(b *testing.B) {
251-
snowflake := NewSnowflake("127.0.0.1:8080")
251+
snowflake := NewSnowflake(0)
252252
originMethod := "testMethod"
253253
router := &mockRouter{
254254
returnError: false,
@@ -263,7 +263,7 @@ func BenchmarkHandle2(b *testing.B) {
263263
},
264264
BroadcastID: broadcastID,
265265
}
266-
sendFn := func(resp protoreflect.ProtoMessage, err error) {}
266+
sendFn := func(resp protoreflect.ProtoMessage, err error) error { return nil }
267267

268268
b.ResetTimer()
269269
b.Run("RequestHandler", func(b *testing.B) {

broadcast/shard.go

-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package broadcast
22

33
import (
44
"context"
5-
"log/slog"
65
"sync"
76
"time"
87
)
@@ -126,7 +125,6 @@ func (s *shard) run(sendBuffer int) {
126125
}
127126
} else {
128127
if msg.IsCancellation {
129-
slog.Error("this is the reason")
130128
// ignore cancellations if a broadcast request
131129
// has not been created yet
132130
continue

broadcast/shard_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (r *slowRouter) Send(broadcastID uint64, addr, method string, req any) erro
3333
func (r *slowRouter) Connect(addr string) {}
3434

3535
func TestShard(t *testing.T) {
36-
snowflake := NewSnowflake("127.0.0.1:8080")
36+
snowflake := NewSnowflake(0)
3737
broadcastID := snowflake.NewBroadcastID()
3838
router := &slowRouter{
3939
returnError: false,
@@ -151,7 +151,7 @@ func TestShard(t *testing.T) {
151151
}
152152

153153
func BenchmarkShard(b *testing.B) {
154-
snowflake := NewSnowflake("127.0.0.1:8080")
154+
snowflake := NewSnowflake(0)
155155
router := &slowRouter{
156156
returnError: false,
157157
}

broadcast/snowflake.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package broadcast
22

33
import (
4+
"log/slog"
45
"math/rand"
56
"sync"
67
"time"
@@ -16,8 +17,8 @@ type Snowflake struct {
1617
}
1718

1819
const (
19-
maxShard = float32(1 << 4)
20-
maxMachineID = float32(1 << 12)
20+
MaxMachineID = uint16(1 << 12)
21+
maxShard = uint8(1 << 4)
2122
maxSequenceNum = uint32(1 << 18)
2223
bitMaskTimestamp = uint64((1<<30)-1) << 34
2324
bitMaskShardID = uint64((1<<4)-1) << 30
@@ -26,10 +27,15 @@ const (
2627
epoch = "2024-01-01T00:00:00"
2728
)
2829

29-
func NewSnowflake(addr string) *Snowflake {
30+
func NewSnowflake(id uint64) *Snowflake {
31+
if id < 0 || id >= uint64(MaxMachineID) {
32+
newID := uint64(rand.Int31n(int32(MaxMachineID)))
33+
slog.Warn("snowflakeID: provided machienID is higher than max or lower than min. A random ID will be assigned instead.", "max", MaxMachineID, "min", 0, "givenID", id, "assignedID", newID)
34+
id = newID
35+
}
3036
timestamp, _ := time.Parse("2006-01-02T15:04:05", epoch)
3137
return &Snowflake{
32-
MachineID: uint64(rand.Int31n(int32(maxMachineID))),
38+
MachineID: id,
3339
SequenceNum: 0,
3440
epoch: timestamp,
3541
//sequenceNum: uint32(maxSequenceNum * rand.Float32()),

broadcast/snowflake_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package broadcast
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestBroadcastID(t *testing.T) {
8+
if MaxMachineID != 4096 {
9+
t.Errorf("maxMachineID is hardcoded in test. want: %v, got: %v", 4096, MaxMachineID)
10+
}
11+
if maxSequenceNum != 262144 {
12+
t.Errorf("maxSequenceNum is hardcoded in test. want: %v, got: %v", 262144, maxSequenceNum)
13+
}
14+
if maxShard != 16 {
15+
t.Errorf("maxShard is hardcoded in test. want: %v, got: %v", 16, maxShard)
16+
}
17+
// intentionally provide an illegal machineID. A random machineID should be given instead.
18+
snowflake := NewSnowflake(8000)
19+
machineID := snowflake.MachineID
20+
timestampDistribution := make(map[uint32]int)
21+
maxN := 262144 // = 2^18
22+
for j := 1; j < 3*maxN; j++ {
23+
i := j % maxN
24+
broadcastID := snowflake.NewBroadcastID()
25+
timestamp, shard, m, n := DecodeBroadcastID(broadcastID)
26+
if i != int(n) {
27+
t.Errorf("wrong sequence number. want: %v, got: %v", i, n)
28+
}
29+
if m >= 4096 {
30+
t.Errorf("machine ID cannot be higher than max. want: %v, got: %v", 4095, m)
31+
}
32+
if m != uint16(machineID) {
33+
t.Errorf("wrong machine ID. want: %v, got: %v", machineID, m)
34+
}
35+
if shard >= 16 {
36+
t.Errorf("cannot have higher shard than max. want: %v, got: %v", 15, shard)
37+
}
38+
if n >= uint32(maxN) {
39+
t.Errorf("sequence number cannot be higher than max. want: %v, got: %v", maxN, n)
40+
}
41+
timestampDistribution[timestamp]++
42+
}
43+
for k, v := range timestampDistribution {
44+
if v > maxN {
45+
t.Errorf("cannot have more than maxN in a second. want: %v, got: %v", maxN, k)
46+
}
47+
}
48+
}

broadcast/state.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ type Content struct {
160160
OriginMethod string
161161
CurrentMethod string
162162
ReceiveChan chan shardResponse
163-
SendFn func(resp protoreflect.ProtoMessage, err error)
163+
SendFn func(resp protoreflect.ProtoMessage, err error) error
164164
Ctx context.Context
165165
CancelCtx context.CancelFunc
166166
Run func(context.Context, func(Msg) error)

handler.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ func createRequest(msg *broadcast.Content, ctx ServerCtx, in *Message, finished
9797
}
9898
}
9999

100-
func createSendFn(msgID uint64, method string, finished chan<- *Message, ctx ServerCtx) func(resp protoreflect.ProtoMessage, err error) {
101-
return func(resp protoreflect.ProtoMessage, err error) {
100+
func createSendFn(msgID uint64, method string, finished chan<- *Message, ctx ServerCtx) func(resp protoreflect.ProtoMessage, err error) error {
101+
return func(resp protoreflect.ProtoMessage, err error) error {
102102
md := &ordering.Metadata{
103103
MessageID: msgID,
104104
Method: method,
105105
}
106106
msg := WrapMessage(md, resp, err)
107-
SendMessage(ctx, finished, msg)
107+
return SendMessage(ctx, finished, msg)
108108
}
109109
}
110110

mgr.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func NewRawManager(opts ...ManagerOption) *RawManager {
5353
if m.logger != nil {
5454
m.logger.Printf("ready")
5555
}
56-
m.snowflake = broadcast.NewSnowflake(m.opts.addr)
56+
m.snowflake = broadcast.NewSnowflake(m.opts.machineID)
5757
return m
5858
}
5959

opts.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type managerOptions struct {
1919
metadata metadata.MD
2020
perNodeMD func(uint32) metadata.MD
2121
publicKey string
22-
addr string
22+
machineID uint64
2323
}
2424

2525
func newManagerOptions() managerOptions {
@@ -104,8 +104,8 @@ func WithPublicKey(publicKey string) ManagerOption {
104104
}
105105
}
106106

107-
func WithAddr(addr string) ManagerOption {
107+
func WithMachineID(id uint64) ManagerOption {
108108
return func(o *managerOptions) {
109-
o.addr = addr
109+
o.machineID = id
110110
}
111111
}

server.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net"
77
"sync"
88

9+
"github.com/relab/gorums/broadcast"
910
"github.com/relab/gorums/ordering"
1011
"google.golang.org/grpc"
1112
"google.golang.org/grpc/codes"
@@ -111,6 +112,13 @@ type serverOptions struct {
111112
connectCallback func(context.Context)
112113
logger *slog.Logger
113114
executionOrder map[string]int
115+
machineID uint64
116+
}
117+
118+
var defaultServerOptions = serverOptions{
119+
// Provide an illegal machineID to avoid unintentional collisions.
120+
// 0 is a valid MachineID and should not be used as default.
121+
machineID: uint64(broadcast.MaxMachineID) + 1,
114122
}
115123

116124
// ServerOption is used to change settings for the GorumsServer
@@ -167,14 +175,14 @@ type Server struct {
167175
// This function is intended for internal Gorums use.
168176
// You should call `NewServer` in the generated code instead.
169177
func NewServer(opts ...ServerOption) *Server {
170-
var serverOpts serverOptions
178+
serverOpts := defaultServerOptions
171179
for _, opt := range opts {
172180
opt(&serverOpts)
173181
}
174182
s := &Server{
175183
srv: newOrderingServer(&serverOpts),
176184
grpcServer: grpc.NewServer(serverOpts.grpcOpts...),
177-
broadcastSrv: newBroadcastServer(serverOpts.logger, serverOpts.executionOrder),
185+
broadcastSrv: newBroadcastServer(serverOpts.logger, serverOpts.executionOrder, serverOpts.machineID),
178186
}
179187
ordering.RegisterGorumsServer(s.grpcServer, s.srv)
180188
return s

0 commit comments

Comments
 (0)