@@ -5,11 +5,11 @@ import (
5
5
"fmt"
6
6
7
7
engine "github.com/ipfs/go-bitswap/internal/decision"
8
- bsmsg "github.com/ipfs/go-bitswap/message"
9
8
pb "github.com/ipfs/go-bitswap/message/pb"
10
9
cid "github.com/ipfs/go-cid"
11
10
process "github.com/jbenet/goprocess"
12
11
procctx "github.com/jbenet/goprocess/context"
12
+ "go.uber.org/zap"
13
13
)
14
14
15
15
// TaskWorkerCount is the total number of simultaneous threads sending
@@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
52
52
continue
53
53
}
54
54
55
- // update the BS ledger to reflect sent message
56
- // TODO: Should only track *useful* messages in ledger
57
- outgoing := bsmsg .New (false )
58
- for _ , block := range envelope .Message .Blocks () {
59
- log .Debugw ("Bitswap.TaskWorker.Work" ,
60
- "Target" , envelope .Peer ,
61
- "Block" , block .Cid (),
62
- )
63
- outgoing .AddBlock (block )
64
- }
65
- for _ , blockPresence := range envelope .Message .BlockPresences () {
66
- outgoing .AddBlockPresence (blockPresence .Cid , blockPresence .Type )
67
- }
68
55
// TODO: Only record message as sent if there was no error?
69
- bs .engine .MessageSent (envelope .Peer , outgoing )
70
-
56
+ // Ideally, yes. But we'd need some way to trigger a retry and/or drop
57
+ // the peer.
58
+ bs .engine .MessageSent (envelope .Peer , envelope .Message )
71
59
bs .sendBlocks (ctx , envelope )
72
- bs .counterLk .Lock ()
73
- for _ , block := range envelope .Message .Blocks () {
74
- bs .counters .blocksSent ++
75
- bs .counters .dataSent += uint64 (len (block .RawData ()))
76
- }
77
- bs .counterLk .Unlock ()
78
60
case <- ctx .Done ():
79
61
return
80
62
}
@@ -84,41 +66,67 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
84
66
}
85
67
}
86
68
87
- func (bs * Bitswap ) sendBlocks (ctx context.Context , env * engine.Envelope ) {
88
- // Blocks need to be sent synchronously to maintain proper backpressure
89
- // throughout the network stack
90
- defer env .Sent ()
91
-
92
- msgSize := 0
93
- msg := bsmsg .New (false )
69
+ func (bs * Bitswap ) logOutgoingBlocks (env * engine.Envelope ) {
70
+ if ce := sflog .Check (zap .DebugLevel , "Bitswap -> send blocks" ); ce == nil {
71
+ return
72
+ }
94
73
95
74
for _ , blockPresence := range env .Message .BlockPresences () {
96
75
c := blockPresence .Cid
97
76
switch blockPresence .Type {
98
77
case pb .Message_Have :
99
- log .Infof ("Sending HAVE %s to %s" , c .String ()[2 :8 ], env .Peer )
78
+ log .Debugw ("sending message" ,
79
+ "type" , "HAVE" ,
80
+ "cid" , c ,
81
+ "peer" , env .Peer ,
82
+ )
100
83
case pb .Message_DontHave :
101
- log .Infof ("Sending DONT_HAVE %s to %s" , c .String ()[2 :8 ], env .Peer )
84
+ log .Debugw ("sending message" ,
85
+ "type" , "DONT_HAVE" ,
86
+ "cid" , c ,
87
+ "peer" , env .Peer ,
88
+ )
102
89
default :
103
90
panic (fmt .Sprintf ("unrecognized BlockPresence type %v" , blockPresence .Type ))
104
91
}
105
92
106
- msgSize += bsmsg .BlockPresenceSize (c )
107
- msg .AddBlockPresence (c , blockPresence .Type )
108
93
}
109
94
for _ , block := range env .Message .Blocks () {
110
- msgSize += len (block .RawData ())
111
- msg .AddBlock (block )
112
- log .Infof ("Sending block %s to %s" , block , env .Peer )
95
+ log .Debugw ("sending message" ,
96
+ "type" , "BLOCK" ,
97
+ "cid" , block .Cid (),
98
+ "peer" , env .Peer ,
99
+ )
113
100
}
101
+ }
114
102
115
- bs .sentHistogram .Observe (float64 (msgSize ))
116
- err := bs .network .SendMessage (ctx , env .Peer , msg )
103
+ func (bs * Bitswap ) sendBlocks (ctx context.Context , env * engine.Envelope ) {
104
+ // Blocks need to be sent synchronously to maintain proper backpressure
105
+ // throughout the network stack
106
+ defer env .Sent ()
107
+
108
+ bs .logOutgoingBlocks (env )
109
+
110
+ err := bs .network .SendMessage (ctx , env .Peer , env .Message )
117
111
if err != nil {
118
- // log.Infof("sendblock error: %s", err)
119
- log .Errorf ("SendMessage error: %s. size: %d. block-presence length: %d" , err , msg .Size (), len (env .Message .BlockPresences ()))
112
+ log .Debugw ("failed to send blocks message" ,
113
+ "peer" , env .Peer ,
114
+ "error" , err ,
115
+ )
116
+ return
117
+ }
118
+
119
+ dataSent := 0
120
+ blocks := env .Message .Blocks ()
121
+ for _ , b := range blocks {
122
+ dataSent += len (b .RawData ())
120
123
}
121
- log .Infof ("Sent message to %s" , env .Peer )
124
+ bs .counterLk .Lock ()
125
+ bs .counters .blocksSent += uint64 (len (blocks ))
126
+ bs .counters .dataSent += uint64 (dataSent )
127
+ bs .counterLk .Unlock ()
128
+ bs .sentHistogram .Observe (float64 (env .Message .Size ()))
129
+ log .Debugw ("sent message" , "peer" , env .Peer )
122
130
}
123
131
124
132
func (bs * Bitswap ) provideWorker (px process.Process ) {
0 commit comments