This repository was archived by the owner on Jan 24, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +10
-2
lines changed
Expand file tree Collapse file tree 1 file changed +10
-2
lines changed Original file line number Diff line number Diff line change @@ -3,6 +3,7 @@ package p2p
33import (
44 "context"
55 "sync"
6+ "sync/atomic"
67 "time"
78
89 "github.com/libp2p/go-libp2p/core/protocol"
@@ -15,7 +16,8 @@ import (
1516)
1617
1718const (
18- NeighborsSendQueueSize = 20_000
19+ NeighborsSendQueueSize = 20_000
20+ DroppedPacketDisconnectThreshold = 100
1921)
2022
2123type queuedPacket struct {
3133
3234// neighbor describes the established p2p connection to another peer.
3335type neighbor struct {
34- peer * network.Peer
36+ peer * network.Peer
37+ droppedPacketCounter atomic.Uint32
3538
3639 logger log.Logger
3740
@@ -84,7 +87,12 @@ func (n *neighbor) Peer() *network.Peer {
8487func (n * neighbor ) Enqueue (packet proto.Message , protocolID protocol.ID ) {
8588 select {
8689 case n .sendQueue <- & queuedPacket {protocolID : protocolID , packet : packet }:
90+ n .droppedPacketCounter .Store (0 )
8791 default :
92+ // Drop a neighbor that does not read from the full queue.
93+ if n .droppedPacketCounter .Add (1 ) >= DroppedPacketDisconnectThreshold {
94+ n .Close ()
95+ }
8896 n .logger .LogWarn ("Dropped packet due to SendQueue being full" )
8997 }
9098}
You can’t perform that action at this time.
0 commit comments