Skip to content
This repository was archived by the owner on Apr 21, 2022. It is now read-only.

Commit 74d66a0

Browse files
authored
implement decaying tags. (#61)
1 parent 2738394 commit 74d66a0

File tree

6 files changed

+745
-104
lines changed

6 files changed

+745
-104
lines changed

connmgr.go

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,30 @@ var log = logging.Logger("connmgr")
2727
//
2828
// See configuration parameters in NewConnManager.
2929
type BasicConnMgr struct {
30-
highWater int
31-
lowWater int
32-
connCount int32
33-
gracePeriod time.Duration
34-
segments segments
30+
*decayer
31+
32+
cfg *BasicConnManagerConfig
33+
segments segments
3534

3635
plk sync.RWMutex
3736
protected map[peer.ID]map[string]struct{}
3837

39-
trimTrigger chan chan<- struct{}
38+
// channel-based semaphore that enforces only a single trim is in progress
39+
trimRunningCh chan struct{}
40+
trimTrigger chan chan<- struct{}
41+
connCount int32
4042

4143
lastTrimMu sync.RWMutex
4244
lastTrim time.Time
4345

44-
silencePeriod time.Duration
45-
4646
ctx context.Context
4747
cancel func()
4848
}
4949

50-
var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
50+
var (
51+
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
52+
_ connmgr.Decayer = (*BasicConnMgr)(nil)
53+
)
5154

5255
type segment struct {
5356
sync.Mutex
@@ -80,6 +83,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
8083
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
8184
temp: true,
8285
tags: make(map[string]int),
86+
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
8387
conns: make(map[network.Conn]time.Time),
8488
}
8589
s.peers[p] = pi
@@ -92,15 +96,32 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
9296
// their connections terminated) until 'low watermark' peers remain.
9397
// * grace is the amount of time a newly opened connection is given before it becomes
9498
// subject to pruning.
95-
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
99+
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
96100
ctx, cancel := context.WithCancel(context.Background())
97-
cm := &BasicConnMgr{
101+
102+
cfg := &BasicConnManagerConfig{
98103
highWater: hi,
99104
lowWater: low,
100105
gracePeriod: grace,
106+
silencePeriod: SilencePeriod,
107+
}
108+
109+
for _, o := range opts {
110+
// TODO we're ignoring errors from options because we have no way to
111+
// return them, or otherwise act on them.
112+
_ = o(cfg)
113+
}
114+
115+
if cfg.decayer == nil {
116+
// Set the default decayer config.
117+
cfg.decayer = (&DecayerCfg{}).WithDefaults()
118+
}
119+
120+
cm := &BasicConnMgr{
121+
cfg: cfg,
122+
trimRunningCh: make(chan struct{}, 1),
101123
trimTrigger: make(chan chan<- struct{}),
102124
protected: make(map[peer.ID]map[string]struct{}, 16),
103-
silencePeriod: SilencePeriod,
104125
ctx: ctx,
105126
cancel: cancel,
106127
segments: func() (ret segments) {
@@ -113,11 +134,17 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
113134
}(),
114135
}
115136

137+
decay, _ := NewDecayer(cfg.decayer, cm)
138+
cm.decayer = decay
139+
116140
go cm.background()
117141
return cm
118142
}
119143

120144
func (cm *BasicConnMgr) Close() error {
145+
if err := cm.decayer.Close(); err != nil {
146+
return err
147+
}
121148
cm.cancel()
122149
return nil
123150
}
@@ -151,10 +178,12 @@ func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
151178

152179
// peerInfo stores metadata for a given peer.
153180
type peerInfo struct {
154-
id peer.ID
155-
tags map[string]int // value for each tag
156-
value int // cached sum of all tag values
157-
temp bool // this is a temporary entry holding early tags, and awaiting connections
181+
id peer.ID
182+
tags map[string]int // value for each tag
183+
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
184+
185+
value int // cached sum of all tag values
186+
temp bool // this is a temporary entry holding early tags, and awaiting connections
158187

159188
conns map[network.Conn]time.Time // start time of each connection
160189

@@ -199,7 +228,7 @@ func (cm *BasicConnMgr) background() {
199228
var waiting chan<- struct{}
200229
select {
201230
case <-ticker.C:
202-
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
231+
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
203232
// Below high water, skip.
204233
continue
205234
}
@@ -235,7 +264,7 @@ func (cm *BasicConnMgr) trim() {
235264
cm.lastTrimMu.RUnlock()
236265

237266
// skip this attempt to trim if the last one just took place.
238-
if time.Since(lastTrim) < cm.silencePeriod {
267+
if time.Since(lastTrim) < cm.cfg.silencePeriod {
239268
return
240269
}
241270

@@ -256,21 +285,21 @@ func (cm *BasicConnMgr) trim() {
256285
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
257286
// connections to close.
258287
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
259-
if cm.lowWater == 0 || cm.highWater == 0 {
288+
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
260289
// disabled
261290
return nil
262291
}
263292

264293
nconns := int(atomic.LoadInt32(&cm.connCount))
265-
if nconns <= cm.lowWater {
294+
if nconns <= cm.cfg.lowWater {
266295
log.Info("open connection count below limit")
267296
return nil
268297
}
269298

270299
npeers := cm.segments.countPeers()
271300
candidates := make([]*peerInfo, 0, npeers)
272301
ncandidates := 0
273-
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
302+
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
274303

275304
cm.plk.RLock()
276305
for _, s := range cm.segments {
@@ -291,7 +320,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
291320
}
292321
cm.plk.RUnlock()
293322

294-
if ncandidates < cm.lowWater {
323+
if ncandidates < cm.cfg.lowWater {
295324
log.Info("open connection count above limit but too many are in the grace period")
296325
// We have too many connections but fewer than lowWater
297326
// connections out of the grace period.
@@ -311,7 +340,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
311340
return left.value < right.value
312341
})
313342

314-
target := ncandidates - cm.lowWater
343+
target := ncandidates - cm.cfg.lowWater
315344

316345
// slightly overallocate because we may have more than one conns per peer
317346
selected := make([]network.Conn, 0, target+10)
@@ -363,6 +392,9 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
363392
for t, v := range pi.tags {
364393
out.Tags[t] = v
365394
}
395+
for t, v := range pi.decaying {
396+
out.Tags[t.name] = v.Value
397+
}
366398
for c, t := range pi.conns {
367399
out.Conns[c.RemoteMultiaddr().String()] = t
368400
}
@@ -439,10 +471,10 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
439471
cm.lastTrimMu.RUnlock()
440472

441473
return CMInfo{
442-
HighWater: cm.highWater,
443-
LowWater: cm.lowWater,
474+
HighWater: cm.cfg.highWater,
475+
LowWater: cm.cfg.lowWater,
444476
LastTrim: lastTrim,
445-
GracePeriod: cm.gracePeriod,
477+
GracePeriod: cm.cfg.gracePeriod,
446478
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
447479
}
448480
}
@@ -478,6 +510,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
478510
id: id,
479511
firstSeen: time.Now(),
480512
tags: make(map[string]int),
513+
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
481514
conns: make(map[network.Conn]time.Time),
482515
}
483516
s.peers[id] = pinfo

0 commit comments

Comments
 (0)