Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 6d1e10d

Browse files
authored
Merge pull request #139 from ipfs/fix/nits
fix some naming nits and broadcast on search
2 parents 0289549 + eb28a2e commit 6d1e10d

File tree

1 file changed

+54
-53
lines changed

1 file changed

+54
-53
lines changed

session/session.go

+54-53
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,18 @@ type Session struct {
7777
tickDelayReqs chan time.Duration
7878

7979
// do not touch outside run loop
80-
tofetch *cidQueue
81-
interest *lru.Cache
82-
pastWants *cidQueue
83-
liveWants map[cid.Cid]time.Time
84-
tick *time.Timer
85-
rebroadcast *time.Timer
86-
baseTickDelay time.Duration
87-
latTotal time.Duration
88-
fetchcnt int
89-
consecutiveTicks int
90-
provSearchDelay time.Duration
91-
rebroadcastDelay delay.D
80+
tofetch *cidQueue
81+
interest *lru.Cache
82+
pastWants *cidQueue
83+
liveWants map[cid.Cid]time.Time
84+
idleTick *time.Timer
85+
periodicSearchTimer *time.Timer
86+
baseTickDelay time.Duration
87+
latTotal time.Duration
88+
fetchcnt int
89+
consecutiveTicks int
90+
initialSearchDelay time.Duration
91+
periodicSearchDelay delay.D
9292
// identifiers
9393
notif notifications.PubSub
9494
uuid logging.Loggable
@@ -102,28 +102,28 @@ func New(ctx context.Context,
102102
wm WantManager,
103103
pm PeerManager,
104104
srs RequestSplitter,
105-
provSearchDelay time.Duration,
106-
rebroadcastDelay delay.D) *Session {
105+
initialSearchDelay time.Duration,
106+
periodicSearchDelay delay.D) *Session {
107107
s := &Session{
108-
liveWants: make(map[cid.Cid]time.Time),
109-
newReqs: make(chan []cid.Cid),
110-
cancelKeys: make(chan []cid.Cid),
111-
tofetch: newCidQueue(),
112-
pastWants: newCidQueue(),
113-
interestReqs: make(chan interestReq),
114-
latencyReqs: make(chan chan time.Duration),
115-
tickDelayReqs: make(chan time.Duration),
116-
ctx: ctx,
117-
wm: wm,
118-
pm: pm,
119-
srs: srs,
120-
incoming: make(chan blkRecv),
121-
notif: notifications.New(),
122-
uuid: loggables.Uuid("GetBlockRequest"),
123-
baseTickDelay: time.Millisecond * 500,
124-
id: id,
125-
provSearchDelay: provSearchDelay,
126-
rebroadcastDelay: rebroadcastDelay,
108+
liveWants: make(map[cid.Cid]time.Time),
109+
newReqs: make(chan []cid.Cid),
110+
cancelKeys: make(chan []cid.Cid),
111+
tofetch: newCidQueue(),
112+
pastWants: newCidQueue(),
113+
interestReqs: make(chan interestReq),
114+
latencyReqs: make(chan chan time.Duration),
115+
tickDelayReqs: make(chan time.Duration),
116+
ctx: ctx,
117+
wm: wm,
118+
pm: pm,
119+
srs: srs,
120+
incoming: make(chan blkRecv),
121+
notif: notifications.New(),
122+
uuid: loggables.Uuid("GetBlockRequest"),
123+
baseTickDelay: time.Millisecond * 500,
124+
id: id,
125+
initialSearchDelay: initialSearchDelay,
126+
periodicSearchDelay: periodicSearchDelay,
127127
}
128128

129129
cache, _ := lru.New(2048)
@@ -239,8 +239,8 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
239239
// Session run loop -- everything function below here should not be called
240240
// of this loop
241241
func (s *Session) run(ctx context.Context) {
242-
s.tick = time.NewTimer(s.provSearchDelay)
243-
s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get())
242+
s.idleTick = time.NewTimer(s.initialSearchDelay)
243+
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
244244
for {
245245
select {
246246
case blk := <-s.incoming:
@@ -253,10 +253,10 @@ func (s *Session) run(ctx context.Context) {
253253
s.handleNewRequest(ctx, keys)
254254
case keys := <-s.cancelKeys:
255255
s.handleCancel(keys)
256-
case <-s.tick.C:
257-
s.handleTick(ctx)
258-
case <-s.rebroadcast.C:
259-
s.handleRebroadcast(ctx)
256+
case <-s.idleTick.C:
257+
s.handleIdleTick(ctx)
258+
case <-s.periodicSearchTimer.C:
259+
s.handlePeriodicSearch(ctx)
260260
case lwchk := <-s.interestReqs:
261261
lwchk.resp <- s.cidIsWanted(lwchk.c)
262262
case resp := <-s.latencyReqs:
@@ -271,15 +271,15 @@ func (s *Session) run(ctx context.Context) {
271271
}
272272

273273
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
274-
s.tick.Stop()
274+
s.idleTick.Stop()
275275

276276
if blk.from != "" {
277277
s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
278278
}
279279

280280
s.receiveBlock(ctx, blk.blk)
281281

282-
s.resetTick()
282+
s.resetIdleTick()
283283
}
284284

285285
func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
@@ -307,7 +307,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
307307
}
308308
}
309309

310-
func (s *Session) handleTick(ctx context.Context) {
310+
func (s *Session) handleIdleTick(ctx context.Context) {
311311

312312
live := make([]cid.Cid, 0, len(s.liveWants))
313313
now := time.Now()
@@ -321,28 +321,29 @@ func (s *Session) handleTick(ctx context.Context) {
321321
s.wm.WantBlocks(ctx, live, nil, s.id)
322322

323323
// do no find providers on consecutive ticks
324-
// -- just rely on periodic rebroadcast
324+
// -- just rely on periodic search widening
325325
if len(live) > 0 && (s.consecutiveTicks == 0) {
326326
s.pm.FindMorePeers(ctx, live[0])
327327
}
328-
s.resetTick()
328+
s.resetIdleTick()
329329

330330
if len(s.liveWants) > 0 {
331331
s.consecutiveTicks++
332332
}
333333
}
334334

335-
func (s *Session) handleRebroadcast(ctx context.Context) {
336-
337-
if len(s.liveWants) == 0 {
335+
func (s *Session) handlePeriodicSearch(ctx context.Context) {
336+
randomWant := s.randomLiveWant()
337+
if !randomWant.Defined() {
338338
return
339339
}
340340

341341
// TODO: come up with a better strategy for determining when to search
342342
// for new providers for blocks.
343-
s.pm.FindMorePeers(ctx, s.randomLiveWant())
343+
s.pm.FindMorePeers(ctx, randomWant)
344+
s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
344345

345-
s.rebroadcast.Reset(s.rebroadcastDelay.Get())
346+
s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
346347
}
347348

348349
func (s *Session) randomLiveWant() cid.Cid {
@@ -357,7 +358,7 @@ func (s *Session) randomLiveWant() cid.Cid {
357358
return cid.Cid{}
358359
}
359360
func (s *Session) handleShutdown() {
360-
s.tick.Stop()
361+
s.idleTick.Stop()
361362
s.notif.Shutdown()
362363

363364
live := make([]cid.Cid, 0, len(s.liveWants))
@@ -436,16 +437,16 @@ func (s *Session) averageLatency() time.Duration {
436437
return s.latTotal / time.Duration(s.fetchcnt)
437438
}
438439

439-
func (s *Session) resetTick() {
440+
func (s *Session) resetIdleTick() {
440441
var tickDelay time.Duration
441442
if s.latTotal == 0 {
442-
tickDelay = s.provSearchDelay
443+
tickDelay = s.initialSearchDelay
443444
} else {
444445
avLat := s.averageLatency()
445446
tickDelay = s.baseTickDelay + (3 * avLat)
446447
}
447448
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
448-
s.tick.Reset(tickDelay)
449+
s.idleTick.Reset(tickDelay)
449450
}
450451

451452
func (s *Session) wantBudget() int {

0 commit comments

Comments
 (0)