@@ -11,25 +11,10 @@ import (
11
11
12
12
const NumShards = 16
13
13
14
- type shardMetrics struct {
15
- totalMsgs uint64
16
- numMsgs uint64
17
- droppedMsgs uint64
18
- numBroadcastMsgs uint64
19
- droppedBroadcastMsgs uint64
20
- numReqs uint64
21
- finishedReqs uint64
22
- lifetimes [][]time.Time
23
- avgLifetime time.Duration
24
- minLifetime time.Duration
25
- maxLifetime time.Duration
26
- }
27
-
28
14
type shard struct {
29
15
mut sync.RWMutex
30
16
id int
31
17
parentCtx context.Context
32
- metrics shardMetrics
33
18
procs map [uint64 ]* BroadcastProcessor
34
19
reqTTL time.Duration
35
20
router Router
@@ -62,7 +47,6 @@ func createShards(ctx context.Context, shardBuffer, sendBuffer int, router Route
62
47
}
63
48
64
49
func (s * shard ) handleMsg (msg * Content ) shardResponse {
65
- //s.metrics.numMsgs++
66
50
// Optimization: first check with a read lock if the processor already exists
67
51
if p , ok := s .getProcessor (msg .BroadcastID ); ok {
68
52
return s .process (p , msg )
@@ -120,7 +104,6 @@ func (s *shard) process(p *BroadcastProcessor, msg *Content) shardResponse {
120
104
// must check if the req is done to prevent deadlock
121
105
select {
122
106
case <- p .ctx .Done ():
123
- //s.metrics.droppedMsgs++
124
107
return shardResponse {
125
108
err : AlreadyProcessedErr {},
126
109
}
@@ -138,11 +121,9 @@ func (s *shard) process(p *BroadcastProcessor, msg *Content) shardResponse {
138
121
}
139
122
140
123
func (s * shard ) handleBMsg (msg * Msg ) {
141
- //s.metrics.numBroadcastMsgs++
142
124
if req , ok := s .getProcessor (msg .BroadcastID ); ok {
143
125
select {
144
126
case <- req .ctx .Done ():
145
- //s.metrics.droppedBroadcastMsgs++
146
127
case req .broadcastChan <- msg :
147
128
}
148
129
}
@@ -178,7 +159,6 @@ func (s *shard) addProcessor(msg *Content) (*BroadcastProcessor, bool) {
178
159
// check size of s.reqs. If too big, then perform necessary cleanup.
179
160
// should only affect the current shard and not the others.
180
161
ctx , cancel := context .WithTimeout (s .parentCtx , s .reqTTL )
181
- //req := &BroadcastRequest{
182
162
var logger * slog.Logger
183
163
if s .logger != nil {
184
164
logger = s .logger .With (logging .BroadcastID (msg .BroadcastID ))
@@ -229,38 +209,3 @@ func (s *shard) gc(nextGC time.Duration) {
229
209
}
230
210
s .procs = newReqs
231
211
}
232
-
233
- func (s * shard ) getStats () shardMetrics {
234
- s .mut .RLock ()
235
- defer s .mut .RUnlock ()
236
- s .metrics .numReqs = uint64 (len (s .procs ))
237
- s .metrics .lifetimes = make ([][]time.Time , len (s .procs ))
238
- s .metrics .totalMsgs = s .metrics .numBroadcastMsgs + s .metrics .numMsgs
239
- minLifetime := 100 * time .Hour
240
- maxLifetime := time .Duration (0 )
241
- totalLifetime := time .Duration (0 )
242
- i := 0
243
- for _ , req := range s .procs {
244
- select {
245
- case <- req .ctx .Done ():
246
- s .metrics .finishedReqs ++
247
- default :
248
- }
249
- s .metrics .lifetimes [i ] = []time.Time {req .started , req .ended }
250
- lifetime := req .ended .Sub (req .started )
251
- if lifetime > 0 {
252
- if lifetime < minLifetime {
253
- minLifetime = lifetime
254
- }
255
- if lifetime > maxLifetime {
256
- maxLifetime = lifetime
257
- }
258
- totalLifetime += lifetime
259
- }
260
- i ++
261
- }
262
- s .metrics .minLifetime = minLifetime
263
- s .metrics .maxLifetime = maxLifetime
264
- s .metrics .avgLifetime = totalLifetime
265
- return s .metrics
266
- }
0 commit comments