From 1f6f25f08ad65cb02c554bb94bedfcfed7073f0a Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 16:27:01 +0100 Subject: [PATCH 1/2] Use the daemon context when handling a stream so that it properly handles the shutdown and does not wait for a deadline --- pkg/network/p2p/autopeering/autopeering.go | 5 +++++ pkg/network/p2p/manager.go | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/network/p2p/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go index 62a26db63..67d5748ae 100644 --- a/pkg/network/p2p/autopeering/autopeering.go +++ b/pkg/network/p2p/autopeering/autopeering.go @@ -213,6 +213,11 @@ func (m *Manager) discoverAndDialPeers() { } for peerAddrInfo := range peerChan { + if m.ctx.Err() != nil { + m.logger.LogDebug("Context is done, stopping dialing new peers") + return + } + if peersToFind <= 0 { m.logger.LogDebug("Enough new autopeering peers connected") return diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 1bf5ece9e..44a559d1d 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -38,6 +38,8 @@ type Manager struct { libp2pHost host.Host peerDB *network.DB + ctx context.Context + logger log.Logger shutdownMutex syncutils.RWMutex @@ -158,6 +160,8 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { // Start starts the manager and initiates manual- and autopeering. func (m *Manager) Start(ctx context.Context, networkID string) error { + m.ctx = ctx + m.manualPeering.Start() if m.autoPeering.MaxNeighbors() > 0 { @@ -283,6 +287,12 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { m.allowPeerMutex.Lock() defer m.allowPeerMutex.Unlock() + if m.ctx.Err() != nil { + m.logger.LogDebugf("aborting handling stream, context is done") + m.closeStream(stream) + + return + } peerID := stream.Conn().RemotePeer() @@ -314,7 +324,7 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { return } - if err := m.addNeighbor(context.Background(), networkPeer, ps); err != nil { + if err := m.addNeighbor(m.ctx, networkPeer, ps); err != nil { m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peerID, err) m.closeStream(stream) From 9f0eeb960eb90ba75da0b082203c434a56e1182b Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 21 Feb 2024 17:02:13 +0100 Subject: [PATCH 2/2] Removed allowPeerMutex. If too many connections are opened they will get trimmed on the next discovery tick --- pkg/network/p2p/manager.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 44a559d1d..3149d3883 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -52,8 +52,6 @@ type Manager struct { autoPeering *autopeering.Manager manualPeering *manualpeering.Manager - - allowPeerMutex syncutils.Mutex } var _ network.Manager = (*Manager)(nil) @@ -118,9 +116,6 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { return ierrors.Wrapf(network.ErrDuplicatePeer, "peer %s already exists", peer.ID) } - m.allowPeerMutex.Lock() - defer m.allowPeerMutex.Unlock() - if !m.allowPeer(peer.ID) { return ierrors.Wrapf(network.ErrMaxAutopeeringPeersReached, "peer %s is not allowed", peer.ID) } @@ -285,8 +280,6 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { return } - m.allowPeerMutex.Lock() - defer m.allowPeerMutex.Unlock() if m.ctx.Err() != nil { m.logger.LogDebugf("aborting handling stream, context is done") m.closeStream(stream) @@ -362,7 +355,9 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe } firstPacketReceivedCtx, firstPacketReceivedCancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second)) - // create and add the neighbor + defer firstPacketReceivedCancel() + + var innerErr error nbr := newNeighbor(m.logger, peer, ps, func(nbr *neighbor, packet proto.Message) { m.protocolHandlerMutex.RLock() defer m.protocolHandlerMutex.RUnlock() @@ -402,7 +397,7 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe return ierrors.WithStack(network.ErrFirstPacketNotReceived) } - return nil + return innerErr } func (m *Manager) NeighborExists(id peer.ID) bool { @@ -440,8 +435,6 @@ func (m *Manager) dropAllNeighbors() { } func (m *Manager) allowPeer(id peer.ID) (allow bool) { - // This should always be called from within the allowPeerMutex lock - // Always allow manual peers if m.manualPeering.IsPeerKnown(id) { m.logger.LogDebugf("Allow manual peer %s", id)