Skip to content

Commit 553d5f9

Browse files
authored
Merge pull request ipfs/go-bitswap#261 from ipfs/fix/prune-session-peers
Prune peers that send too many consecutive DONT_HAVEs This commit was moved from ipfs/go-bitswap@9ddd13e
2 parents 6c49ac8 + ea2e295 commit 553d5f9

File tree

2 files changed

+237
-10
lines changed

2 files changed

+237
-10
lines changed

bitswap/internal/session/sessionwantsender.go

+42-10
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,13 @@ import (
99
peer "github.com/libp2p/go-libp2p-core/peer"
1010
)
1111

12-
// Maximum number of changes to accept before blocking
13-
const changesBufferSize = 128
12+
const (
13+
// Maximum number of changes to accept before blocking
14+
changesBufferSize = 128
15+
// If the session receives this many DONT_HAVEs in a row from a peer,
16+
// it prunes the peer from the session
17+
peerDontHaveLimit = 16
18+
)
1419

1520
// BlockPresence indicates whether a peer has a block.
1621
// Note that the order is important, we decide which peer to send a want to
@@ -76,13 +81,14 @@ type sessionWantSender struct {
7681
changes chan change
7782
// Information about each want indexed by CID
7883
wants map[cid.Cid]*wantInfo
84+
// Keeps track of how many consecutive DONT_HAVEs a peer has sent
85+
peerConsecutiveDontHaves map[peer.ID]int
7986
// Tracks which peers we have send want-block to
8087
swbt *sentWantBlocksTracker
8188
// Maintains a list of peers and whether they are connected
8289
peerAvlMgr *peerAvailabilityManager
8390
// Tracks the number of blocks each peer sent us
8491
peerRspTrkr *peerResponseTracker
85-
8692
// Sends wants to peers
8793
pm PeerManager
8894
// Keeps track of which peer has / doesn't have a block
@@ -97,13 +103,14 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, bpm *
97103
onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {
98104

99105
spm := sessionWantSender{
100-
ctx: ctx,
101-
sessionID: sid,
102-
changes: make(chan change, changesBufferSize),
103-
wants: make(map[cid.Cid]*wantInfo),
104-
swbt: newSentWantBlocksTracker(),
105-
peerAvlMgr: newPeerAvailabilityManager(),
106-
peerRspTrkr: newPeerResponseTracker(),
106+
ctx: ctx,
107+
sessionID: sid,
108+
changes: make(chan change, changesBufferSize),
109+
wants: make(map[cid.Cid]*wantInfo),
110+
peerConsecutiveDontHaves: make(map[peer.ID]int),
111+
swbt: newSentWantBlocksTracker(),
112+
peerAvlMgr: newPeerAvailabilityManager(),
113+
peerRspTrkr: newPeerResponseTracker(),
107114

108115
pm: pm,
109116
bpm: bpm,
@@ -258,13 +265,22 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool)
258265
if isNowAvailable {
259266
newlyAvailable = append(newlyAvailable, p)
260267
}
268+
// Reset the count of consecutive DONT_HAVEs received from the
269+
// peer
270+
delete(spm.peerConsecutiveDontHaves, p)
261271
}
262272
}
263273
}
264274

265275
return newlyAvailable
266276
}
267277

278+
// isAvailable indicates whether the peer is available and whether
279+
// it's been tracked by the Session (used by the tests)
280+
func (spm *sessionWantSender) isAvailable(p peer.ID) (bool, bool) {
281+
return spm.peerAvlMgr.isAvailable(p)
282+
}
283+
268284
// trackWant creates a new entry in the map of CID -> want info
269285
func (spm *sessionWantSender) trackWant(c cid.Cid) {
270286
// fmt.Printf("trackWant %s\n", lu.C(c))
@@ -285,6 +301,7 @@ func (spm *sessionWantSender) trackWant(c cid.Cid) {
285301

286302
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs
287303
func (spm *sessionWantSender) processUpdates(updates []update) {
304+
prunePeers := make(map[peer.ID]struct{})
288305
dontHaves := cid.NewSet()
289306
for _, upd := range updates {
290307
// TODO: If there is a timeout for the want from the peer, remove want.sentTo
@@ -308,12 +325,20 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
308325
spm.setWantSentTo(c, "")
309326
}
310327
}
328+
329+
// Track the number of consecutive DONT_HAVEs each peer receives
330+
if spm.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
331+
prunePeers[upd.from] = struct{}{}
332+
} else {
333+
spm.peerConsecutiveDontHaves[upd.from]++
334+
}
311335
}
312336

313337
// For each HAVE
314338
for _, c := range upd.haves {
315339
// Update the block presence for the peer
316340
spm.updateWantBlockPresence(c, upd.from)
341+
delete(spm.peerConsecutiveDontHaves, upd.from)
317342
}
318343

319344
// For each received block
@@ -325,6 +350,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
325350
// us the block
326351
spm.peerRspTrkr.receivedBlockFrom(upd.from)
327352
}
353+
delete(spm.peerConsecutiveDontHaves, upd.from)
328354
}
329355
}
330356

@@ -337,6 +363,12 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
337363
spm.onPeersExhausted(newlyExhausted)
338364
}
339365
}
366+
367+
// If any peers have sent us too many consecutive DONT_HAVEs, remove them
368+
// from the session
369+
for p := range prunePeers {
370+
spm.SignalAvailability(p, false)
371+
}
340372
}
341373

342374
// convenience structs for passing around want-blocks and want-haves for a peer

bitswap/internal/session/sessionwantsender_test.go

+195
Original file line numberDiff line numberDiff line change
@@ -346,3 +346,198 @@ func TestPeersExhausted(t *testing.T) {
346346
t.Fatal("Wrong keys")
347347
}
348348
}
349+
350+
func TestConsecutiveDontHaveLimit(t *testing.T) {
351+
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
352+
p := testutil.GeneratePeers(1)[0]
353+
sid := uint64(1)
354+
pm := newMockPeerManager()
355+
bpm := bsbpm.New()
356+
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
357+
onPeersExhausted := func([]cid.Cid) {}
358+
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
359+
360+
go spm.Run()
361+
362+
// Add all cids as wants
363+
spm.Add(cids)
364+
365+
// Receive a HAVE from peer (adds it to the session)
366+
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
367+
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
368+
369+
// Wait for processing to complete
370+
time.Sleep(5 * time.Millisecond)
371+
372+
// Peer should be available
373+
if avail, ok := spm.isAvailable(p); !ok || !avail {
374+
t.Fatal("Expected peer to be available")
375+
}
376+
377+
// Receive DONT_HAVEs from peer that do not exceed limit
378+
for _, c := range cids[1:peerDontHaveLimit] {
379+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
380+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
381+
}
382+
383+
// Wait for processing to complete
384+
time.Sleep(5 * time.Millisecond)
385+
386+
// Peer should be available
387+
if avail, ok := spm.isAvailable(p); !ok || !avail {
388+
t.Fatal("Expected peer to be available")
389+
}
390+
391+
// Receive DONT_HAVEs from peer that exceed limit
392+
for _, c := range cids[peerDontHaveLimit:] {
393+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
394+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
395+
}
396+
397+
// Wait for processing to complete
398+
time.Sleep(5 * time.Millisecond)
399+
400+
// Session should remove peer
401+
if avail, _ := spm.isAvailable(p); avail {
402+
t.Fatal("Expected peer not to be available")
403+
}
404+
}
405+
406+
func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
407+
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
408+
p := testutil.GeneratePeers(1)[0]
409+
sid := uint64(1)
410+
pm := newMockPeerManager()
411+
bpm := bsbpm.New()
412+
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
413+
onPeersExhausted := func([]cid.Cid) {}
414+
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
415+
416+
go spm.Run()
417+
418+
// Add all cids as wants
419+
spm.Add(cids)
420+
421+
// Receive a HAVE from peer (adds it to the session)
422+
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
423+
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
424+
425+
// Wait for processing to complete
426+
time.Sleep(5 * time.Millisecond)
427+
428+
// Peer should be available
429+
if avail, ok := spm.isAvailable(p); !ok || !avail {
430+
t.Fatal("Expected peer to be available")
431+
}
432+
433+
// Receive DONT_HAVE then HAVE then DONT_HAVE from peer,
434+
// where consecutive DONT_HAVEs would have exceeded limit
435+
// (but they are not consecutive)
436+
for _, c := range cids[1:peerDontHaveLimit] {
437+
// DONT_HAVEs
438+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
439+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
440+
}
441+
for _, c := range cids[peerDontHaveLimit : peerDontHaveLimit+1] {
442+
// HAVEs
443+
bpm.ReceiveFrom(p, []cid.Cid{c}, []cid.Cid{})
444+
spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}, false)
445+
}
446+
for _, c := range cids[peerDontHaveLimit+1:] {
447+
// DONT_HAVEs
448+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
449+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
450+
}
451+
452+
// Wait for processing to complete
453+
time.Sleep(5 * time.Millisecond)
454+
455+
// Peer should be available
456+
if avail, ok := spm.isAvailable(p); !ok || !avail {
457+
t.Fatal("Expected peer to be available")
458+
}
459+
}
460+
461+
func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
462+
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
463+
p := testutil.GeneratePeers(1)[0]
464+
sid := uint64(1)
465+
pm := newMockPeerManager()
466+
bpm := bsbpm.New()
467+
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
468+
onPeersExhausted := func([]cid.Cid) {}
469+
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
470+
471+
go spm.Run()
472+
473+
// Add all cids as wants
474+
spm.Add(cids)
475+
476+
// Receive a HAVE from peer (adds it to the session)
477+
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
478+
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
479+
480+
// Wait for processing to complete
481+
time.Sleep(5 * time.Millisecond)
482+
483+
// Peer should be available
484+
if avail, ok := spm.isAvailable(p); !ok || !avail {
485+
t.Fatal("Expected peer to be available")
486+
}
487+
488+
// Receive DONT_HAVEs from peer that exceed limit
489+
for _, c := range cids[1 : peerDontHaveLimit+2] {
490+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
491+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
492+
}
493+
494+
// Wait for processing to complete
495+
time.Sleep(5 * time.Millisecond)
496+
497+
// Session should remove peer
498+
if avail, _ := spm.isAvailable(p); avail {
499+
t.Fatal("Expected peer not to be available")
500+
}
501+
502+
// Receive a HAVE from peer (adds it back into the session)
503+
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
504+
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
505+
506+
// Wait for processing to complete
507+
time.Sleep(5 * time.Millisecond)
508+
509+
// Peer should be available
510+
if avail, ok := spm.isAvailable(p); !ok || !avail {
511+
t.Fatal("Expected peer to be available")
512+
}
513+
514+
cids2 := testutil.GenerateCids(peerDontHaveLimit + 10)
515+
516+
// Receive DONT_HAVEs from peer that don't exceed limit
517+
for _, c := range cids2[1:peerDontHaveLimit] {
518+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
519+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
520+
}
521+
522+
// Wait for processing to complete
523+
time.Sleep(5 * time.Millisecond)
524+
525+
// Peer should be available
526+
if avail, ok := spm.isAvailable(p); !ok || !avail {
527+
t.Fatal("Expected peer to be available")
528+
}
529+
530+
// Receive DONT_HAVEs from peer that exceed limit
531+
for _, c := range cids2[peerDontHaveLimit:] {
532+
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
533+
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
534+
}
535+
536+
// Wait for processing to complete
537+
time.Sleep(5 * time.Millisecond)
538+
539+
// Session should remove peer
540+
if avail, _ := spm.isAvailable(p); avail {
541+
t.Fatal("Expected peer not to be available")
542+
}
543+
}

0 commit comments

Comments
 (0)