Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 7888679

Browse files
authored
Merge pull request #317 from ipfs/fix/sesswantmgr-shutdown
wait for sessionWantSender to shutdown before completing session shutdown
2 parents ae75342 + 70c3111 commit 7888679

File tree

3 files changed

+39
-20
lines changed

3 files changed

+39
-20
lines changed

internal/session/session.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func New(ctx context.Context,
159159
periodicSearchDelay: periodicSearchDelay,
160160
self: self,
161161
}
162-
s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
162+
s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
163163

164164
go s.run(ctx)
165165

@@ -387,6 +387,9 @@ func (s *Session) handleShutdown() {
387387
s.idleTick.Stop()
388388
// Shut down the session peer manager
389389
s.sprm.Shutdown()
390+
// Shut down the sessionWantSender (blocks until sessionWantSender stops
391+
// sending)
392+
s.sws.Shutdown()
390393
// Remove the session from the want manager
391394
s.wm.RemoveSession(s.ctx, s.id)
392395
}

internal/session/sessionwantsender.go

+24-8
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,13 @@ type onPeersExhaustedFn func([]cid.Cid)
7171
// consults the peer response tracker (records which peers sent us blocks).
7272
//
7373
type sessionWantSender struct {
74-
// When the context is cancelled, sessionWantSender shuts down
74+
// The context is used when sending wants
7575
ctx context.Context
76+
// Called to shutdown the sessionWantSender
77+
shutdown func()
78+
// The sessionWantSender uses the closed channel to signal when it's
79+
// finished shutting down
80+
closed chan struct{}
7681
// The session ID
7782
sessionID uint64
7883
// A channel that collects incoming changes (events)
@@ -97,11 +102,14 @@ type sessionWantSender struct {
97102
onPeersExhausted onPeersExhaustedFn
98103
}
99104

100-
func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm SessionPeerManager,
105+
func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
101106
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {
102107

108+
ctx, cancel := context.WithCancel(context.Background())
103109
sws := sessionWantSender{
104110
ctx: ctx,
111+
shutdown: cancel,
112+
closed: make(chan struct{}),
105113
sessionID: sid,
106114
changes: make(chan change, changesBufferSize),
107115
wants: make(map[cid.Cid]*wantInfo),
@@ -158,12 +166,25 @@ func (sws *sessionWantSender) Run() {
158166
case ch := <-sws.changes:
159167
sws.onChange([]change{ch})
160168
case <-sws.ctx.Done():
161-
sws.shutdown()
169+
// Unregister the session with the PeerManager
170+
sws.pm.UnregisterSession(sws.sessionID)
171+
172+
// Close the 'closed' channel to signal to Shutdown() that the run
173+
// loop has exited
174+
close(sws.closed)
162175
return
163176
}
164177
}
165178
}
166179

180+
// Shutdown the sessionWantSender
181+
func (sws *sessionWantSender) Shutdown() {
182+
// Signal to the run loop to stop processing
183+
sws.shutdown()
184+
// Wait for run loop to complete
185+
<-sws.closed
186+
}
187+
167188
// addChange adds a new change to the queue
168189
func (sws *sessionWantSender) addChange(c change) {
169190
select {
@@ -172,11 +193,6 @@ func (sws *sessionWantSender) addChange(c change) {
172193
}
173194
}
174195

175-
// shutdown unregisters the session with the PeerManager
176-
func (sws *sessionWantSender) shutdown() {
177-
sws.pm.UnregisterSession(sws.sessionID)
178-
}
179-
180196
// collectChanges collects all the changes that have occurred since the last
181197
// invocation of onChange
182198
func (sws *sessionWantSender) collectChanges(changes []change) []change {

internal/session/sessionwantsender_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func TestSendWants(t *testing.T) {
138138
bpm := bsbpm.New()
139139
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
140140
onPeersExhausted := func([]cid.Cid) {}
141-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
141+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
142142

143143
go spm.Run()
144144

@@ -176,7 +176,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
176176
bpm := bsbpm.New()
177177
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
178178
onPeersExhausted := func([]cid.Cid) {}
179-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
179+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
180180

181181
go spm.Run()
182182

@@ -234,7 +234,7 @@ func TestReceiveBlock(t *testing.T) {
234234
bpm := bsbpm.New()
235235
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
236236
onPeersExhausted := func([]cid.Cid) {}
237-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
237+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
238238

239239
go spm.Run()
240240

@@ -294,7 +294,7 @@ func TestPeerUnavailable(t *testing.T) {
294294
bpm := bsbpm.New()
295295
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
296296
onPeersExhausted := func([]cid.Cid) {}
297-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
297+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
298298

299299
go spm.Run()
300300

@@ -360,7 +360,7 @@ func TestPeersExhausted(t *testing.T) {
360360
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
361361

362362
ep := exhaustedPeers{}
363-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
363+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
364364

365365
go spm.Run()
366366

@@ -436,7 +436,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
436436
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
437437

438438
ep := exhaustedPeers{}
439-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
439+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
440440

441441
go spm.Run()
442442

@@ -484,7 +484,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
484484
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
485485

486486
ep := exhaustedPeers{}
487-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
487+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
488488

489489
go spm.Run()
490490

@@ -522,7 +522,7 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
522522
bpm := bsbpm.New()
523523
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
524524
onPeersExhausted := func([]cid.Cid) {}
525-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
525+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
526526

527527
go spm.Run()
528528

@@ -578,7 +578,7 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
578578
bpm := bsbpm.New()
579579
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
580580
onPeersExhausted := func([]cid.Cid) {}
581-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
581+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
582582

583583
go spm.Run()
584584

@@ -633,7 +633,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
633633
bpm := bsbpm.New()
634634
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
635635
onPeersExhausted := func([]cid.Cid) {}
636-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
636+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
637637

638638
go spm.Run()
639639

@@ -717,7 +717,7 @@ func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
717717
bpm := bsbpm.New()
718718
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
719719
onPeersExhausted := func([]cid.Cid) {}
720-
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
720+
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)
721721

722722
go spm.Run()
723723

0 commit comments

Comments
 (0)