Skip to content

Commit 116531c

Browse files
feat(broadcast): added more fields to broadcast metadata
1 parent 0c937f7 commit 116531c

File tree

6 files changed

+38
-20
lines changed

6 files changed

+38
-20
lines changed

broadcast.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,16 @@ type Broadcaster interface{}
170170

171171
type BroadcastMetadata struct {
172172
BroadcastID uint64
173-
IsBroadcastClient bool // type of sender, could be: Client or Server
174-
SenderAddr string // address of last hop
175-
OriginAddr string // address of the origin
176-
OriginMethod string // the first method called by the origin
177-
Method string // the current method
178-
Digest []byte // digest of original message sent by client
173+
IsBroadcastClient bool // type of sender, could be: Client or Server
174+
SenderAddr string // address of last hop
175+
OriginAddr string // address of the origin
176+
OriginMethod string // the first method called by the origin
177+
Method string // the current method
178+
Digest []byte // digest of original message sent by client
179+
Timestamp time.Time // timestamp in seconds when the broadcast request was issued by the client/server
180+
ShardID uint16 // ID of the shard handling the broadcast request
181+
MachineID uint16 // ID of the client/server that issued the broadcast request
182+
SequenceNo uint32 // sequence number of the broadcast request from that particular client/server. Will roll over when reaching max.
179183
}
180184

181185
func newBroadcastMetadata(md *ordering.Metadata) BroadcastMetadata {
@@ -187,12 +191,17 @@ func newBroadcastMetadata(md *ordering.Metadata) BroadcastMetadata {
187191
if len(tmp) >= 1 {
188192
m = tmp[len(tmp)-1]
189193
}
194+
timestamp, shardID, machineID, sequenceNo := broadcast.DecodeBroadcastID(md.BroadcastMsg.BroadcastID)
190195
return BroadcastMetadata{
191196
BroadcastID: md.BroadcastMsg.BroadcastID,
192197
IsBroadcastClient: md.BroadcastMsg.IsBroadcastClient,
193198
SenderAddr: md.BroadcastMsg.SenderAddr,
194199
OriginAddr: md.BroadcastMsg.OriginAddr,
195200
OriginMethod: md.BroadcastMsg.OriginMethod,
196201
Method: m,
202+
Timestamp: broadcast.Epoch().Add(time.Duration(timestamp) * time.Second),
203+
ShardID: shardID,
204+
MachineID: machineID,
205+
SequenceNo: sequenceNo,
197206
}
198207
}

broadcast/consts.go

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ type BroadcastOptions struct {
1919

2020
const (
2121
BroadcastID string = "broadcastID"
22+
// special origin addr used in creating a broadcast request from a server
23+
ServerOriginAddr string = "server"
2224
)
2325

2426
type ServerHandler func(ctx context.Context, in protoreflect.ProtoMessage, broadcastID uint64, originAddr, originMethod string, options BroadcastOptions, id uint32, addr string)

broadcast/processor.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@ import (
88
)
99

1010
type BroadcastProcessor struct {
11-
broadcastID uint64
12-
router Router
13-
broadcastChan chan Msg
14-
sendChan chan Content
15-
ctx context.Context
16-
cancelFunc context.CancelFunc
17-
started time.Time
18-
ended time.Time
11+
broadcastID uint64
12+
router Router
13+
broadcastChan chan Msg
14+
sendChan chan Content
15+
ctx context.Context
16+
cancelFunc context.CancelFunc
17+
started time.Time
18+
ended time.Time
19+
20+
// handled by shard
1921
cancellationCtx context.Context
20-
cancellationCtxCancel context.CancelFunc // should only be called by the shard
22+
cancellationCtxCancel context.CancelFunc
2123
sentCancellation bool
2224

25+
// ordering
2326
executionOrder map[string]int
2427
orderIndex int
2528
outOfOrderMsgs map[string][]Content

broadcast/router.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (r *BroadcastRouter) routeClientReply(broadcastID uint64, addr, method stri
102102
}
103103

104104
func (r *BroadcastRouter) getClient(addr string) (*Client, error) {
105-
if addr == "" {
105+
if addr == "" || addr == ServerOriginAddr {
106106
return nil, InvalidAddrErr{addr: addr}
107107
}
108108
// fast path:

broadcast/snowflake.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,22 @@ const (
2626
epoch = "2024-01-01T00:00:00"
2727
)
2828

29+
func Epoch() time.Time {
30+
timestamp, _ := time.Parse("2006-01-02T15:04:05", epoch)
31+
return timestamp
32+
}
33+
2934
func NewSnowflake(id uint64) *Snowflake {
3035
if id < 0 || id >= uint64(MaxMachineID) {
3136
//newID := uint64(rand.Int31n(int32(MaxMachineID)))
3237
//slog.Warn("snowflakeID: provided machineID is higher than max or lower than min. A random ID will be assigned instead.", "max", MaxMachineID, "min", 0, "givenID", id, "assignedID", newID)
3338
//id = newID
3439
id = uint64(rand.Int31n(int32(MaxMachineID)))
3540
}
36-
timestamp, _ := time.Parse("2006-01-02T15:04:05", epoch)
3741
return &Snowflake{
3842
MachineID: id,
3943
SequenceNum: 0,
40-
epoch: timestamp,
44+
epoch: Epoch(),
4145
//sequenceNum: uint32(maxSequenceNum * rand.Float32()),
4246
}
4347
}
@@ -70,7 +74,7 @@ start:
7074
return t | shard | m | n
7175
}
7276

73-
func DecodeBroadcastID(broadcastID uint64) (uint32, uint16, uint16, uint32) {
77+
func DecodeBroadcastID(broadcastID uint64) (timestamp uint32, shardID uint16, machineID uint16, sequenceNo uint32) {
7478
t := (broadcastID & bitMaskTimestamp) >> 34
7579
shard := (broadcastID & bitMaskShardID) >> 30
7680
m := (broadcastID & bitMaskMachineID) >> 18

handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func (srv *broadcastServer) serverBroadcastHandler(method string, req protorefle
180180
Message: req,
181181
Method: method,
182182
BroadcastID: srv.manager.NewBroadcastID(),
183-
OriginAddr: "server",
183+
OriginAddr: broadcast.ServerOriginAddr,
184184
IsBroadcastClient: false,
185185
}
186186
srv.viewMutex.RLock()

0 commit comments

Comments
 (0)