Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit c980d7e

Browse files
authored
Merge pull request #177 from ipfs/refactor/use-global-pubsub-notifier
Refactor: use global pubsub notifier
2 parents a5fe0d4 + 994279b commit c980d7e

File tree

8 files changed

+223
-102
lines changed

8 files changed

+223
-102
lines changed

bitswap.go

+36-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
bsmsg "github.com/ipfs/go-bitswap/message"
1717
bsmq "github.com/ipfs/go-bitswap/messagequeue"
1818
bsnet "github.com/ipfs/go-bitswap/network"
19+
notifications "github.com/ipfs/go-bitswap/notifications"
1920
bspm "github.com/ipfs/go-bitswap/peermanager"
2021
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
2122
bssession "github.com/ipfs/go-bitswap/session"
@@ -116,16 +117,18 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
116117
pqm := bspqm.New(ctx, network)
117118

118119
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
120+
notif notifications.PubSub,
119121
provSearchDelay time.Duration,
120122
rebroadcastDelay delay.D) bssm.Session {
121-
return bssession.New(ctx, id, wm, pm, srs, provSearchDelay, rebroadcastDelay)
123+
return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
122124
}
123125
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
124126
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
125127
}
126128
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
127129
return bssrs.New(ctx)
128130
}
131+
notif := notifications.New()
129132

130133
bs := &Bitswap{
131134
blockstore: bstore,
@@ -136,7 +139,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
136139
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
137140
wm: wm,
138141
pqm: pqm,
139-
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
142+
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
143+
notif: notif,
140144
counters: new(counters),
141145
dupMetric: dupHist,
142146
allMetric: allHist,
@@ -163,6 +167,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
163167
go func() {
164168
<-px.Closing() // process closes first
165169
cancelFunc()
170+
notif.Shutdown()
166171
}()
167172
procctx.CloseAfterContext(px, ctx) // parent cancelled first
168173

@@ -187,6 +192,9 @@ type Bitswap struct {
187192
// NB: ensure threadsafety
188193
blockstore blockstore.Blockstore
189194

195+
// manages channels of outgoing blocks for sessions
196+
notif notifications.PubSub
197+
190198
// newBlocks is a channel for newly added blocks to be provided to the
191199
// network. blocks pushed down this channel get buffered and fed to the
192200
// provideKeys channel later on to avoid too much network activity
@@ -307,18 +315,38 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
307315
// to the same node. We should address this soon, but i'm not going to do
308316
// it now as it requires more thought and isnt causing immediate problems.
309317

310-
// Send all blocks (including duplicates) to any sessions that want them.
318+
allKs := make([]cid.Cid, 0, len(blks))
319+
for _, b := range blks {
320+
allKs = append(allKs, b.Cid())
321+
}
322+
323+
wantedKs := allKs
324+
if len(blks) != len(wanted) {
325+
wantedKs = make([]cid.Cid, 0, len(wanted))
326+
for _, b := range wanted {
327+
wantedKs = append(wantedKs, b.Cid())
328+
}
329+
}
330+
331+
// Send all block keys (including duplicates) to any sessions that want them.
311332
// (The duplicates are needed by sessions for accounting purposes)
312-
bs.sm.ReceiveBlocksFrom(from, blks)
333+
bs.sm.ReceiveFrom(from, allKs)
313334

314-
// Send wanted blocks to decision engine
315-
bs.engine.AddBlocks(wanted)
335+
// Send wanted block keys to decision engine
336+
bs.engine.AddBlocks(wantedKs)
337+
338+
// Publish the block to any Bitswap clients that had requested blocks.
339+
// (the sessions use this pubsub mechanism to inform clients of received
340+
// blocks)
341+
for _, b := range wanted {
342+
bs.notif.Publish(b)
343+
}
316344

317345
// If the reprovider is enabled, send wanted blocks to reprovider
318346
if bs.provideEnabled {
319-
for _, b := range wanted {
347+
for _, k := range wantedKs {
320348
select {
321-
case bs.newBlocks <- b.Cid():
349+
case bs.newBlocks <- k:
322350
// send block off to be reprovided
323351
case <-bs.process.Closing():
324352
return bs.process.Close()

decision/engine.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/google/uuid"
1111
bsmsg "github.com/ipfs/go-bitswap/message"
1212
wl "github.com/ipfs/go-bitswap/wantlist"
13-
blocks "github.com/ipfs/go-block-format"
1413
cid "github.com/ipfs/go-cid"
1514
bstore "github.com/ipfs/go-ipfs-blockstore"
1615
logging "github.com/ipfs/go-log"
@@ -312,13 +311,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
312311
}
313312
}
314313

315-
func (e *Engine) addBlocks(blocks []blocks.Block) {
314+
func (e *Engine) addBlocks(ks []cid.Cid) {
316315
work := false
317316

318317
for _, l := range e.ledgerMap {
319318
l.lk.Lock()
320-
for _, block := range blocks {
321-
if entry, ok := l.WantListContains(block.Cid()); ok {
319+
for _, k := range ks {
320+
if entry, ok := l.WantListContains(k); ok {
322321
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
323322
Identifier: entry.Cid,
324323
Priority: entry.Priority,
@@ -337,11 +336,11 @@ func (e *Engine) addBlocks(blocks []blocks.Block) {
337336
// AddBlocks is called when new blocks are received and added to a block store,
338337
// meaning there may be peers who want those blocks, so we should send the blocks
339338
// to them.
340-
func (e *Engine) AddBlocks(blocks []blocks.Block) {
339+
func (e *Engine) AddBlocks(ks []cid.Cid) {
341340
e.lock.Lock()
342341
defer e.lock.Unlock()
343342

344-
e.addBlocks(blocks)
343+
e.addBlocks(ks)
345344
}
346345

347346
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce

getter/getter.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -61,40 +61,56 @@ func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block,
6161
type WantFunc func(context.Context, []cid.Cid)
6262

6363
// AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
64-
// blocks, a want function, and a close function,
65-
// and returns a channel of incoming blocks.
66-
func AsyncGetBlocks(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
64+
// blocks, a want function, and a close function, and returns a channel of
65+
// incoming blocks.
66+
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
67+
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
68+
69+
// If there are no keys supplied, just return a closed channel
6770
if len(keys) == 0 {
6871
out := make(chan blocks.Block)
6972
close(out)
7073
return out, nil
7174
}
7275

76+
// Use a PubSub notifier to listen for incoming blocks for each key
7377
remaining := cid.NewSet()
7478
promise := notif.Subscribe(ctx, keys...)
7579
for _, k := range keys {
7680
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
7781
remaining.Add(k)
7882
}
7983

84+
// Send the want request for the keys to the network
8085
want(ctx, keys)
8186

8287
out := make(chan blocks.Block)
83-
go handleIncoming(ctx, remaining, promise, out, cwants)
88+
go handleIncoming(ctx, sessctx, remaining, promise, out, cwants)
8489
return out, nil
8590
}
8691

87-
func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {
92+
// Listens for incoming blocks, passing them to the out channel.
93+
// If the context is cancelled or the incoming channel closes, calls cfun with
94+
// any keys corresponding to blocks that were never received.
95+
func handleIncoming(ctx context.Context, sessctx context.Context, remaining *cid.Set,
96+
in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {
97+
8898
ctx, cancel := context.WithCancel(ctx)
99+
100+
// Clean up before exiting this function, and call the cancel function on
101+
// any remaining keys
89102
defer func() {
90103
cancel()
91104
close(out)
92105
// can't just defer this call on its own, arguments are resolved *when* the defer is created
93106
cfun(remaining.Keys())
94107
}()
108+
95109
for {
96110
select {
97111
case blk, ok := <-in:
112+
// If the channel is closed, we're done (note that PubSub closes
113+
// the channel once all the keys have been received)
98114
if !ok {
99115
return
100116
}
@@ -104,9 +120,13 @@ func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Bl
104120
case out <- blk:
105121
case <-ctx.Done():
106122
return
123+
case <-sessctx.Done():
124+
return
107125
}
108126
case <-ctx.Done():
109127
return
128+
case <-sessctx.Done():
129+
return
110130
}
111131
}
112132
}

notifications/notifications.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ func (ps *impl) Shutdown() {
6060
}
6161

6262
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
63-
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
64-
// blocks.
63+
// is closed if the |ctx| times out or is cancelled, or after receiving the blocks
64+
// corresponding to |keys|.
6565
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {
6666

6767
blocksCh := make(chan blocks.Block, len(keys))
@@ -82,6 +82,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
8282
default:
8383
}
8484

85+
// AddSubOnceEach listens for each key in the list, and closes the channel
86+
// once all keys have been received
8587
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
8688
go func() {
8789
defer func() {

0 commit comments

Comments
 (0)