Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit d39c760

Browse files
authored
Merge pull request #326 from ipfs/feat/faster-send-blocks
avoid copying messages and improve logging
2 parents 288ceff + 8c7bf92 commit d39c760

File tree

4 files changed

+80
-47
lines changed

4 files changed

+80
-47
lines changed

bitswap.go

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
)
3838

3939
var log = logging.Logger("bitswap")
40+
var sflog = log.Desugar()
4041

4142
var _ exchange.SessionExchange = (*Bitswap)(nil)
4243

internal/decision/engine.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -733,8 +733,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
733733

734734
// Remove sent block presences from the want list for the peer
735735
for _, bp := range m.BlockPresences() {
736-
// TODO: record block presence bytes as well?
737-
// l.SentBytes(?)
736+
// Don't record sent data. We reserve that for data blocks.
738737
if bp.Type == pb.Message_Have {
739738
l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
740739
}

internal/messagequeue/messagequeue.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -466,23 +466,43 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
466466

467467
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
468468
// Save some CPU cycles and allocations if log level is higher than debug
469-
if ce := sflog.Check(zap.DebugLevel, "Bitswap -> send wants"); ce == nil {
469+
if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
470470
return
471471
}
472472

473473
self := mq.network.Self()
474474
for _, e := range wantlist {
475475
if e.Cancel {
476476
if e.WantType == pb.Message_Wantlist_Have {
477-
log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid)
477+
log.Debugw("sent message",
478+
"type", "CANCEL_WANT_HAVE",
479+
"cid", e.Cid,
480+
"local", self,
481+
"to", mq.p,
482+
)
478483
} else {
479-
log.Debugw("Bitswap -> cancel-block", "local", self, "to", mq.p, "cid", e.Cid)
484+
log.Debugw("sent message",
485+
"type", "CANCEL_WANT_BLOCK",
486+
"cid", e.Cid,
487+
"local", self,
488+
"to", mq.p,
489+
)
480490
}
481491
} else {
482492
if e.WantType == pb.Message_Wantlist_Have {
483-
log.Debugw("Bitswap -> want-have", "local", self, "to", mq.p, "cid", e.Cid)
493+
log.Debugw("sent message",
494+
"type", "WANT_HAVE",
495+
"cid", e.Cid,
496+
"local", self,
497+
"to", mq.p,
498+
)
484499
} else {
485-
log.Debugw("Bitswap -> want-block", "local", self, "to", mq.p, "cid", e.Cid)
500+
log.Debugw("sent message",
501+
"type", "WANT_BLOCK",
502+
"cid", e.Cid,
503+
"local", self,
504+
"to", mq.p,
505+
)
486506
}
487507
}
488508
}

workers.go

+53-40
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"fmt"
66

77
engine "github.com/ipfs/go-bitswap/internal/decision"
8-
bsmsg "github.com/ipfs/go-bitswap/message"
98
pb "github.com/ipfs/go-bitswap/message/pb"
109
cid "github.com/ipfs/go-cid"
1110
process "github.com/jbenet/goprocess"
1211
procctx "github.com/jbenet/goprocess/context"
12+
"go.uber.org/zap"
1313
)
1414

1515
// TaskWorkerCount is the total number of simultaneous threads sending
@@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
5252
continue
5353
}
5454

55-
// update the BS ledger to reflect sent message
56-
// TODO: Should only track *useful* messages in ledger
57-
outgoing := bsmsg.New(false)
58-
for _, block := range envelope.Message.Blocks() {
59-
log.Debugw("Bitswap.TaskWorker.Work",
60-
"Target", envelope.Peer,
61-
"Block", block.Cid(),
62-
)
63-
outgoing.AddBlock(block)
64-
}
65-
for _, blockPresence := range envelope.Message.BlockPresences() {
66-
outgoing.AddBlockPresence(blockPresence.Cid, blockPresence.Type)
67-
}
6855
// TODO: Only record message as sent if there was no error?
69-
bs.engine.MessageSent(envelope.Peer, outgoing)
70-
56+
// Ideally, yes. But we'd need some way to trigger a retry and/or drop
57+
// the peer.
58+
bs.engine.MessageSent(envelope.Peer, envelope.Message)
7159
bs.sendBlocks(ctx, envelope)
72-
bs.counterLk.Lock()
73-
for _, block := range envelope.Message.Blocks() {
74-
bs.counters.blocksSent++
75-
bs.counters.dataSent += uint64(len(block.RawData()))
76-
}
77-
bs.counterLk.Unlock()
7860
case <-ctx.Done():
7961
return
8062
}
@@ -84,41 +66,72 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
8466
}
8567
}
8668

87-
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
88-
// Blocks need to be sent synchronously to maintain proper backpressure
89-
// throughout the network stack
90-
defer env.Sent()
69+
func (bs *Bitswap) logOutgoingBlocks(env *engine.Envelope) {
70+
if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
71+
return
72+
}
9173

92-
msgSize := 0
93-
msg := bsmsg.New(false)
74+
self := bs.network.Self()
9475

9576
for _, blockPresence := range env.Message.BlockPresences() {
9677
c := blockPresence.Cid
9778
switch blockPresence.Type {
9879
case pb.Message_Have:
99-
log.Infof("Sending HAVE %s to %s", c.String()[2:8], env.Peer)
80+
log.Debugw("sent message",
81+
"type", "HAVE",
82+
"cid", c,
83+
"local", self,
84+
"to", env.Peer,
85+
)
10086
case pb.Message_DontHave:
101-
log.Infof("Sending DONT_HAVE %s to %s", c.String()[2:8], env.Peer)
87+
log.Debugw("sent message",
88+
"type", "DONT_HAVE",
89+
"cid", c,
90+
"local", self,
91+
"to", env.Peer,
92+
)
10293
default:
10394
panic(fmt.Sprintf("unrecognized BlockPresence type %v", blockPresence.Type))
10495
}
10596

106-
msgSize += bsmsg.BlockPresenceSize(c)
107-
msg.AddBlockPresence(c, blockPresence.Type)
10897
}
10998
for _, block := range env.Message.Blocks() {
110-
msgSize += len(block.RawData())
111-
msg.AddBlock(block)
112-
log.Infof("Sending block %s to %s", block, env.Peer)
99+
log.Debugw("sent message",
100+
"type", "BLOCK",
101+
"cid", block.Cid(),
102+
"local", self,
103+
"to", env.Peer,
104+
)
113105
}
106+
}
114107

115-
bs.sentHistogram.Observe(float64(msgSize))
116-
err := bs.network.SendMessage(ctx, env.Peer, msg)
108+
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
109+
// Blocks need to be sent synchronously to maintain proper backpressure
110+
// throughout the network stack
111+
defer env.Sent()
112+
113+
err := bs.network.SendMessage(ctx, env.Peer, env.Message)
117114
if err != nil {
118-
// log.Infof("sendblock error: %s", err)
119-
log.Errorf("SendMessage error: %s. size: %d. block-presence length: %d", err, msg.Size(), len(env.Message.BlockPresences()))
115+
log.Debugw("failed to send blocks message",
116+
"peer", env.Peer,
117+
"error", err,
118+
)
119+
return
120+
}
121+
122+
bs.logOutgoingBlocks(env)
123+
124+
dataSent := 0
125+
blocks := env.Message.Blocks()
126+
for _, b := range blocks {
127+
dataSent += len(b.RawData())
120128
}
121-
log.Infof("Sent message to %s", env.Peer)
129+
bs.counterLk.Lock()
130+
bs.counters.blocksSent += uint64(len(blocks))
131+
bs.counters.dataSent += uint64(dataSent)
132+
bs.counterLk.Unlock()
133+
bs.sentHistogram.Observe(float64(env.Message.Size()))
134+
log.Debugw("sent message", "peer", env.Peer)
122135
}
123136

124137
func (bs *Bitswap) provideWorker(px process.Process) {

0 commit comments

Comments
 (0)