Skip to content

Commit

Permalink
fix(bitswap): blockpresencemanager leak
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Feb 4, 2025
1 parent 9e257a7 commit e854475
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 38 deletions.
6 changes: 3 additions & 3 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,9 @@ func (s *Session) handleReceive(ks []cid.Cid) {
// Record latency
s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

// Inform the SessionInterestManager that this session is no longer
// expecting to receive the wanted keys
s.sim.RemoveSessionWants(s.id, wanted)
// Inform the SessionManager that this session is no longer expecting to
// receive the wanted keys, since we now have them
s.sm.CancelSessionWants(s.id, wanted)

s.idleTick.Stop()

Expand Down
22 changes: 13 additions & 9 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ const blockSize = 4

type mockSessionMgr struct {
lk sync.Mutex
sim *bssim.SessionInterestManager
removeSession bool
cancels []cid.Cid
}

func newMockSessionMgr() *mockSessionMgr {
return &mockSessionMgr{}
func newMockSessionMgr(sim *bssim.SessionInterestManager) *mockSessionMgr {
return &mockSessionMgr{
sim: sim,
}
}

func (msm *mockSessionMgr) removeSessionCalled() bool {
Expand All @@ -54,6 +57,7 @@ func (msm *mockSessionMgr) CancelSessionWants(sid uint64, wants []cid.Cid) {
msm.lk.Lock()
defer msm.lk.Unlock()
msm.cancels = append(msm.cancels, wants...)
msm.sim.RemoveSessionWants(sid, wants)
}

func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
Expand Down Expand Up @@ -164,7 +168,7 @@ func TestSessionGetBlocks(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize)
var cids []cid.Cid
Expand Down Expand Up @@ -244,7 +248,7 @@ func TestSessionFindMorePeers(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond)
blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize)
Expand Down Expand Up @@ -314,7 +318,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blks := random.BlocksOfSize(broadcastLiveWantsLimit+5, blockSize)
var cids []cid.Cid
Expand Down Expand Up @@ -351,7 +355,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blks := random.BlocksOfSize(4, blockSize)
var cids []cid.Cid
Expand Down Expand Up @@ -449,7 +453,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down Expand Up @@ -492,7 +496,7 @@ func TestSessionOnShutdownCalled(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand All @@ -519,7 +523,7 @@ func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := random.SequenceNext()
sm := newMockSessionMgr()
sm := newMockSessionMgr(sim)
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blks := random.BlocksOfSize(2, blockSize)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
Expand Down
43 changes: 29 additions & 14 deletions bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager"
bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager"
bsspm "github.com/ipfs/boxo/bitswap/client/internal/sessionpeermanager"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-test/random"
Expand Down Expand Up @@ -147,7 +148,8 @@ func TestSendWants(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -181,7 +183,8 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -231,7 +234,8 @@ func TestReceiveBlock(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -284,7 +288,8 @@ func TestCancelWants(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -319,7 +324,8 @@ func TestRegisterSessionWithPeerManager(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -357,7 +363,8 @@ func TestProtectConnFirstPeerToSendWantedBlock(t *testing.T) {
pm := newMockPeerManager()
fpt := newFakePeerTagger()
fpm := bsspm.New(1, fpt)
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -411,7 +418,8 @@ func TestPeerUnavailable(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -470,7 +478,8 @@ func TestPeersExhausted(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

Expand Down Expand Up @@ -543,7 +552,8 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

Expand Down Expand Up @@ -590,7 +600,8 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

Expand Down Expand Up @@ -628,7 +639,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -680,7 +692,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -733,7 +746,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down Expand Up @@ -809,7 +823,8 @@ func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
const sid = uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
sim := bssim.New()
swc := newMockSessionMgr(sim)
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
Expand Down
5 changes: 0 additions & 5 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
// their contents. If the caller needs to preserve a copy of the lists it
// should make a copy before calling ReceiveFrom.
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// Send CANCEL to all peers with want-have / want-block. This needs to be
// done before filtering out CIDs that peers are no longer interested in,
// to ensure they are removed from PeerManager and PeerQueue want lists.
sm.peerManager.SendCancels(ctx, blks)

// Keep only the keys that at least one session wants
keys := sm.sessionInterestManager.FilterInterests(blks, haves, dontHaves)
blks = keys[0]
Expand Down
15 changes: 8 additions & 7 deletions bitswap/client/internal/sessionmanager/sessionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid
fs.ks = append(fs.ks, ks...)
fs.wantBlocks = append(fs.wantBlocks, wantBlocks...)
fs.wantHaves = append(fs.wantHaves, wantHaves...)
fs.sm.CancelSessionWants(fs.id, ks)
}

func (fs *fakeSession) Shutdown() {
Expand Down Expand Up @@ -132,13 +133,6 @@ func TestReceiveFrom(t *testing.T) {
sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
if len(firstSession.ks) == 0 ||
len(secondSession.ks) > 0 ||
len(thirdSession.ks) == 0 {
t.Fatal("should have received blocks but didn't")
}

sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{})
if len(firstSession.wantBlocks) == 0 ||
len(secondSession.wantBlocks) > 0 ||
Expand All @@ -153,6 +147,13 @@ func TestReceiveFrom(t *testing.T) {
t.Fatal("should have received want-haves but didn't")
}

sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
if len(firstSession.ks) == 0 ||
len(secondSession.ks) > 0 ||
len(thirdSession.ks) == 0 {
t.Fatal("should have received blocks but didn't")
}

require.Len(t, pm.cancelled(), 1, "should have sent cancel for received blocks")
}

Expand Down

0 comments on commit e854475

Please sign in to comment.