Skip to content

Commit 6c49ac8

Browse files
authored
Merge pull request ipfs/go-bitswap#255 from ipfs/feat/debounce-alt
feat: debounce wants manually This commit was moved from ipfs/go-bitswap@f4c9702
2 parents 7d9cbcc + 4017e53 commit 6c49ac8

File tree

1 file changed

+50
-14
lines changed

1 file changed

+50
-14
lines changed

bitswap/internal/messagequeue/messagequeue.go

+50-14
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"sync"
77
"time"
88

9-
debounce "github.com/bep/debounce"
10-
119
bsmsg "github.com/ipfs/go-bitswap/message"
1210
pb "github.com/ipfs/go-bitswap/message/pb"
1311
bsnet "github.com/ipfs/go-bitswap/network"
@@ -34,6 +32,11 @@ const (
3432
maxPriority = math.MaxInt32
3533
// sendMessageDebounce is the debounce duration when calling sendMessage()
3634
sendMessageDebounce = time.Millisecond
35+
// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
36+
sendMessageCutoff = 256
37+
// when we debounce for more than sendMessageMaxDelay, we'll send the
38+
// message immediately.
39+
sendMessageMaxDelay = 20 * time.Millisecond
3740
)
3841

3942
// MessageNetwork is any network that can connect peers and generate a message
@@ -54,9 +57,8 @@ type MessageQueue struct {
5457
maxMessageSize int
5558
sendErrorBackoff time.Duration
5659

57-
signalWorkReady func()
58-
outgoingWork chan struct{}
59-
done chan struct{}
60+
outgoingWork chan time.Time
61+
done chan struct{}
6062

6163
// Take lock whenever any of these variables are modified
6264
wllock sync.Mutex
@@ -165,17 +167,13 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
165167
bcstWants: newRecallWantList(),
166168
peerWants: newRecallWantList(),
167169
cancels: cid.NewSet(),
168-
outgoingWork: make(chan struct{}, 1),
170+
outgoingWork: make(chan time.Time, 1),
169171
done: make(chan struct{}),
170172
rebroadcastInterval: defaultRebroadcastInterval,
171173
sendErrorBackoff: sendErrorBackoff,
172174
priority: maxPriority,
173175
}
174176

175-
// Apply debounce to the work ready signal (which triggers sending a message)
176-
debounced := debounce.New(sendMessageDebounce)
177-
mq.signalWorkReady = func() { debounced(mq.onWorkReady) }
178-
179177
return mq
180178
}
181179

@@ -285,11 +283,45 @@ func (mq *MessageQueue) onShutdown() {
285283
func (mq *MessageQueue) runQueue() {
286284
defer mq.onShutdown()
287285

286+
// Create a timer for debouncing scheduled work.
287+
scheduleWork := time.NewTimer(0)
288+
if !scheduleWork.Stop() {
289+
// Need to drain the timer if Stop() returns false
290+
// See: https://golang.org/pkg/time/#Timer.Stop
291+
<-scheduleWork.C
292+
}
293+
294+
var workScheduled time.Time
288295
for {
289296
select {
290297
case <-mq.rebroadcastTimer.C:
291298
mq.rebroadcastWantlist()
292-
case <-mq.outgoingWork:
299+
case when := <-mq.outgoingWork:
300+
// If we have work scheduled, cancel the timer. If we
301+
// don't, record when the work was scheduled.
302+
// We send the time on the channel so we accurately
303+
// track delay.
304+
if workScheduled.IsZero() {
305+
workScheduled = when
306+
} else if !scheduleWork.Stop() {
307+
// Need to drain the timer if Stop() returns false
308+
<-scheduleWork.C
309+
}
310+
311+
// If we have too many updates and/or we've waited too
312+
// long, send immediately.
313+
if mq.pendingWorkCount() > sendMessageCutoff ||
314+
time.Since(workScheduled) >= sendMessageMaxDelay {
315+
mq.sendIfReady()
316+
workScheduled = time.Time{}
317+
} else {
318+
// Otherwise, extend the timer.
319+
scheduleWork.Reset(sendMessageDebounce)
320+
}
321+
case <-scheduleWork.C:
322+
// We have work scheduled and haven't seen any updates
323+
// in sendMessageDebounce. Send immediately.
324+
workScheduled = time.Time{}
293325
mq.sendIfReady()
294326
case <-mq.done:
295327
if mq.sender != nil {
@@ -335,9 +367,9 @@ func (mq *MessageQueue) transferRebroadcastWants() bool {
335367
return true
336368
}
337369

338-
func (mq *MessageQueue) onWorkReady() {
370+
func (mq *MessageQueue) signalWorkReady() {
339371
select {
340-
case mq.outgoingWork <- struct{}{}:
372+
case mq.outgoingWork <- time.Now():
341373
default:
342374
}
343375
}
@@ -443,10 +475,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
443475
// }
444476

445477
func (mq *MessageQueue) hasPendingWork() bool {
478+
return mq.pendingWorkCount() > 0
479+
}
480+
481+
func (mq *MessageQueue) pendingWorkCount() int {
446482
mq.wllock.Lock()
447483
defer mq.wllock.Unlock()
448484

449-
return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0
485+
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
450486
}
451487

452488
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {

0 commit comments

Comments
 (0)