Skip to content

Commit 8d0dcda

Browse files
authored
Merge pull request ipfs/go-bitswap#375 from ipfs/refactor/sess-ctx-cancels
Send CANCELs when session context is cancelled This commit was moved from ipfs/go-bitswap@26fbfbf
2 parents 497c51f + d03b4a0 commit 8d0dcda

File tree

3 files changed

+19
-14
lines changed

3 files changed

+19
-14
lines changed

bitswap/bitswap.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
139139
pm := bspm.New(ctx, peerQueueFactory, network.Self())
140140
pqm := bspqm.New(ctx, network)
141141

142-
sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
142+
sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
143143
sim *bssim.SessionInterestManager,
144144
pm bssession.PeerManager,
145145
bpm *bsbpm.BlockPresenceManager,
146146
notif notifications.PubSub,
147147
provSearchDelay time.Duration,
148148
rebroadcastDelay delay.D,
149149
self peer.ID) bssm.Session {
150-
return bssession.New(ctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
150+
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
151151
}
152152
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
153153
return bsspm.New(id, network.ConnectionManager())

bitswap/internal/session/session.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ type op struct {
9191
// info to, and who to request blocks from.
9292
type Session struct {
9393
// dependencies
94-
ctx context.Context
94+
bsctx context.Context // context for bitswap
95+
ctx context.Context // context for session
9596
pm PeerManager
9697
bpm *bsbpm.BlockPresenceManager
9798
sprm SessionPeerManager
@@ -124,7 +125,9 @@ type Session struct {
124125

125126
// New creates a new bitswap session whose lifetime is bounded by the
126127
// given context.
127-
func New(ctx context.Context,
128+
func New(
129+
bsctx context.Context, // context for bitswap
130+
ctx context.Context, // context for this session
128131
id uint64,
129132
sprm SessionPeerManager,
130133
providerFinder ProviderFinder,
@@ -138,6 +141,7 @@ func New(ctx context.Context,
138141
s := &Session{
139142
sw: newSessionWants(broadcastLiveWantsLimit),
140143
tickDelayReqs: make(chan time.Duration),
144+
bsctx: bsctx,
141145
ctx: ctx,
142146
pm: pm,
143147
bpm: bpm,
@@ -393,10 +397,11 @@ func (s *Session) handleShutdown() {
393397
// in anymore
394398
s.bpm.RemoveKeys(cancelKs)
395399

396-
// TODO: If the context is cancelled this won't actually send any CANCELs.
397-
// We should use a longer lived context to send out these CANCELs.
398-
// Send CANCEL to all peers for blocks that no session is interested in anymore
399-
s.pm.SendCancels(s.ctx, cancelKs)
400+
// Send CANCEL to all peers for blocks that no session is interested in
401+
// anymore.
402+
// Note: use bitswap context because session context has already been
403+
// cancelled.
404+
s.pm.SendCancels(s.bsctx, cancelKs)
400405
}
401406

402407
// handleReceive is called when the session receives blocks from a peer

bitswap/internal/session/session_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestSessionGetBlocks(t *testing.T) {
103103
notif := notifications.New()
104104
defer notif.Shutdown()
105105
id := testutil.GenerateSessionID()
106-
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
106+
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
107107
blockGenerator := blocksutil.NewBlockGenerator()
108108
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
109109
var cids []cid.Cid
@@ -198,7 +198,7 @@ func TestSessionFindMorePeers(t *testing.T) {
198198
notif := notifications.New()
199199
defer notif.Shutdown()
200200
id := testutil.GenerateSessionID()
201-
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
201+
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
202202
session.SetBaseTickDelay(200 * time.Microsecond)
203203
blockGenerator := blocksutil.NewBlockGenerator()
204204
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
@@ -272,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
272272
notif := notifications.New()
273273
defer notif.Shutdown()
274274
id := testutil.GenerateSessionID()
275-
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
275+
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
276276
blockGenerator := blocksutil.NewBlockGenerator()
277277
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
278278
var cids []cid.Cid
@@ -316,7 +316,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
316316
notif := notifications.New()
317317
defer notif.Shutdown()
318318
id := testutil.GenerateSessionID()
319-
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
319+
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
320320
blockGenerator := blocksutil.NewBlockGenerator()
321321
blks := blockGenerator.Blocks(4)
322322
var cids []cid.Cid
@@ -431,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
431431

432432
// Create a new session with its own context
433433
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
434-
session := New(sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
434+
session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
435435

436436
timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
437437
defer timerCancel()
@@ -472,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
472472
notif := notifications.New()
473473
defer notif.Shutdown()
474474
id := testutil.GenerateSessionID()
475-
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
475+
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
476476
blockGenerator := blocksutil.NewBlockGenerator()
477477
blks := blockGenerator.Blocks(2)
478478
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

0 commit comments

Comments
 (0)