Skip to content
This repository was archived by the owner on Jan 24, 2025. It is now read-only.

Commit 97bab37

Browse files
authored
Merge pull request #982 from iotaledger/fix/scheduler-token-bucket
Fix scheduler token bucket waiting
2 parents f889322 + 21197e8 commit 97bab37

File tree

6 files changed

+29
-6
lines changed

6 files changed

+29
-6
lines changed

pkg/network/p2p/neighbor.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package p2p
33
import (
44
"context"
55
"sync"
6+
"sync/atomic"
67
"time"
78

89
"github.com/libp2p/go-libp2p/core/protocol"
@@ -15,7 +16,8 @@ import (
1516
)
1617

1718
const (
18-
NeighborsSendQueueSize = 20_000
19+
NeighborsSendQueueSize = 20_000
20+
DroppedPacketDisconnectThreshold = 100
1921
)
2022

2123
type queuedPacket struct {
@@ -31,7 +33,8 @@ type (
3133

3234
// neighbor describes the established p2p connection to another peer.
3335
type neighbor struct {
34-
peer *network.Peer
36+
peer *network.Peer
37+
droppedPacketCounter atomic.Uint32
3538

3639
logger log.Logger
3740

@@ -84,7 +87,12 @@ func (n *neighbor) Peer() *network.Peer {
8487
func (n *neighbor) Enqueue(packet proto.Message, protocolID protocol.ID) {
8588
select {
8689
case n.sendQueue <- &queuedPacket{protocolID: protocolID, packet: packet}:
90+
n.droppedPacketCounter.Store(0)
8791
default:
92+
// Drop a neighbor that does not read from the full queue.
93+
if n.droppedPacketCounter.Add(1) >= DroppedPacketDisconnectThreshold {
94+
n.Close()
95+
}
8896
n.logger.LogWarn("Dropped packet due to SendQueue being full")
8997
}
9098
}

pkg/protocol/engine/blockdag/events.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package blockdag
33
import (
44
"github.com/iotaledger/hive.go/runtime/event"
55
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
6+
iotago "github.com/iotaledger/iota.go/v4"
67
)
78

89
// Events is a collection of Tangle related Events.
@@ -19,6 +20,9 @@ type Events struct {
1920
// MissingBlockAppended is triggered when a previously missing Block was appended.
2021
MissingBlockAppended *event.Event1[*blocks.Block]
2122

23+
// BlockNotAppended is triggered when an incoming Block could not be successfully appended.
24+
BlockNotAppended *event.Event1[iotago.BlockID]
25+
2226
// BlockInvalid is triggered when a Block is found to be invalid.
2327
BlockInvalid *event.Event2[*blocks.Block, error]
2428

@@ -32,6 +36,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
3236
BlockSolid: event.New1[*blocks.Block](),
3337
BlockMissing: event.New1[*blocks.Block](),
3438
MissingBlockAppended: event.New1[*blocks.Block](),
39+
BlockNotAppended: event.New1[iotago.BlockID](),
3540
BlockInvalid: event.New2[*blocks.Block, error](),
3641
}
3742
})

pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin
4747

4848
e.Events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) {
4949
if _, _, err := b.Append(block); err != nil {
50+
b.events.BlockNotAppended.Trigger(block.ID())
5051
b.LogError("failed to append block", "blockID", block.ID(), "issuer", block.ProtocolBlock().Header.IssuerID, "err", err)
5152
}
5253
}, event.WithWorkerPool(wp))

pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (b *BasicBuffer) ringInsert(v interface{}) *ring.Ring {
371371
func (b *BasicBuffer) waitTime(rate float64, block *blocks.Block) time.Duration {
372372
tokensRequired := float64(block.WorkScore()) - (b.tokenBucket + rate*time.Since(b.lastScheduleTime).Seconds())
373373

374-
return lo.Max(0, time.Duration(tokensRequired/rate))
374+
return lo.Max(0, time.Duration(tokensRequired/rate)*time.Second)
375375
}
376376

377377
func (b *BasicBuffer) updateTokenBucket(rate float64, tokenBucketSize float64) {

pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,10 +370,11 @@ loop:
370370
case blockToSchedule = <-s.basicBuffer.blockChan:
371371
currentAPI := s.apiProvider.CommittedAPI()
372372
rate := currentAPI.ProtocolParameters().CongestionControlParameters().SchedulerRate
373-
if waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0 {
373+
for waitTime := s.basicBuffer.waitTime(float64(rate), blockToSchedule); waitTime > 0; {
374374
timer := time.NewTimer(waitTime)
375375
<-timer.C
376376
}
377+
377378
s.basicBuffer.updateTokenBucket(float64(rate), float64(currentAPI.MaxBlockWork()))
378379

379380
s.scheduleBasicBlock(blockToSchedule)

pkg/protocol/engine/engine.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,15 +585,23 @@ func (e *Engine) setupEvictionState() {
585585

586586
func (e *Engine) setupBlockRequester() {
587587
e.Events.BlockRequester.LinkTo(e.BlockRequester.Events)
588-
588+
wp := e.Workers.CreatePool("BlockRequester", workerpool.WithWorkerCount(1)) // Using just 1 worker to avoid contention
589589
// We need to hook to make sure that the request is created before the block arrives to avoid a race condition
590590
// where we try to delete the request again before it is created. Thus, continuing to request forever.
591591
e.Events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) {
592592
e.BlockRequester.StartTicker(block.ID())
593593
})
594+
594595
e.Events.BlockDAG.MissingBlockAppended.Hook(func(block *blocks.Block) {
595596
e.BlockRequester.StopTicker(block.ID())
596-
}, event.WithWorkerPool(e.Workers.CreatePool("BlockRequester", workerpool.WithWorkerCount(1)))) // Using just 1 worker to avoid contention
597+
}, event.WithWorkerPool(wp))
598+
599+
// Remove the block from the ticker if it failed to be appended.
600+
// It's executed for all blocks to avoid locking twice:
601+
// once to check if the block has the ticker and then again to remove it.
602+
e.Events.BlockDAG.BlockNotAppended.Hook(func(blockID iotago.BlockID) {
603+
e.BlockRequester.StopTicker(blockID)
604+
}, event.WithWorkerPool(wp))
597605
}
598606

599607
func (e *Engine) setupPruning() {

0 commit comments

Comments
 (0)