Skip to content

Commit

Permalink
Merge pull request #6997 from onflow/petera/make-peer-cache-config
Browse files Browse the repository at this point in the history
[Network] Make ProtocolPeerCache configurable
  • Loading branch information
peterargue authored Feb 7, 2025
2 parents 8fbbd56 + 16503c2 commit 6bd6cab
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 62 deletions.
1 change: 1 addition & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
&p2pbuilderconfig.UnicastConfig{
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
}).
SetProtocolPeerCacheList(protocols.FlowProtocolID(builder.SporkID)).
SetBasicResolver(builder.Resolver).
SetSubscriptionFilter(networkingsubscription.NewRoleBasedFilter(flow.RoleAccess, builder.IdentityProvider)).
SetConnectionManager(connManager).
Expand Down
1 change: 1 addition & 0 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ func (fnb *FlowNodeBuilder) BuildPublicLibp2pNode(address string, bootstrapIdent
&p2pbuilderconfig.UnicastConfig{
Unicast: fnb.FlowConfig.NetworkConfig.Unicast,
}).
SetProtocolPeerCacheList(protocols.FlowProtocolID(fnb.SporkID)).
SetSubscriptionFilter(
subscription.NewRoleBasedFilter(
subscription.UnstakedRole, fnb.IdentityProvider,
Expand Down
8 changes: 5 additions & 3 deletions network/p2p/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -206,7 +207,8 @@ type NodeConfig struct {
// logger used to provide logging
Logger zerolog.Logger `validate:"required"`
// reference to the libp2p host (https://godoc.org/github.com/libp2p/go-libp2p/core/host)
Host host.Host `validate:"required"`
PeerManager PeerManager
DisallowListCacheCfg *DisallowListCacheConfig `validate:"required"`
Host host.Host `validate:"required"`
PeerManager PeerManager
DisallowListCacheCfg *DisallowListCacheConfig `validate:"required"`
ProtocolPeerCacheList []protocol.ID
}
37 changes: 23 additions & 14 deletions network/p2p/builder/libp2pNodeBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/core/transport"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
Expand Down Expand Up @@ -63,16 +64,17 @@ type LibP2PNodeBuilder struct {
metricsConfig *p2pbuilderconfig.MetricsConfig
basicResolver madns.BasicResolver

resourceManager network.ResourceManager
resourceManagerCfg *p2pconfig.ResourceManagerConfig
connManager connmgr.ConnManager
connGater p2p.ConnectionGater
routingFactory func(context.Context, host.Host) (routing.Routing, error)
peerManagerConfig *p2pbuilderconfig.PeerManagerConfig
createNode p2p.NodeConstructor
disallowListCacheCfg *p2p.DisallowListCacheConfig
unicastConfig *p2pbuilderconfig.UnicastConfig
networkingType flownet.NetworkingType // whether the node is running in private (staked) or public (unstaked) network
resourceManager network.ResourceManager
resourceManagerCfg *p2pconfig.ResourceManagerConfig
connManager connmgr.ConnManager
connGater p2p.ConnectionGater
routingFactory func(context.Context, host.Host) (routing.Routing, error)
peerManagerConfig *p2pbuilderconfig.PeerManagerConfig
createNode p2p.NodeConstructor
disallowListCacheCfg *p2p.DisallowListCacheConfig
unicastConfig *p2pbuilderconfig.UnicastConfig
networkingType flownet.NetworkingType // whether the node is running in private (staked) or public (unstaked) network
protocolPeerCacheList []protocol.ID
}

func NewNodeBuilder(
Expand Down Expand Up @@ -155,6 +157,12 @@ func (builder *LibP2PNodeBuilder) OverrideDefaultValidateQueueSize(size int) p2p
return builder
}

// SetProtocolPeerCacheList sets the protocols to track in the protocol peer cache.
func (builder *LibP2PNodeBuilder) SetProtocolPeerCacheList(protocols ...protocol.ID) p2p.NodeBuilder {
builder.protocolPeerCacheList = protocols
return builder
}

// OverrideGossipSubFactory overrides the default gossipsub factory for the GossipSub protocol.
// The purpose of override is to allow the node to provide a custom gossipsub factory for sake of testing or experimentation.
// Note: it is not recommended to override the default gossipsub factory in production unless you know what you are doing.
Expand Down Expand Up @@ -284,10 +292,11 @@ func (builder *LibP2PNodeBuilder) Build() (p2p.LibP2PNode, error) {
Parameters: &p2p.NodeParameters{
EnableProtectedStreams: builder.unicastConfig.EnableStreamProtection,
},
Logger: builder.logger,
Host: h,
PeerManager: peerManager,
DisallowListCacheCfg: builder.disallowListCacheCfg,
Logger: builder.logger,
Host: h,
PeerManager: peerManager,
DisallowListCacheCfg: builder.disallowListCacheCfg,
ProtocolPeerCacheList: builder.protocolPeerCacheList,
})
if err != nil {
return nil, fmt.Errorf("could not create libp2p node: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type ProtocolPeerCache interface {
// RemoveProtocols removes the specified protocols for the given peer from the protocol cache.
RemoveProtocols(peerID peer.ID, protocols []protocol.ID)

// GetPeers returns a copy of the set of peers that support the given protocol.
GetPeers(pid protocol.ID) map[peer.ID]struct{}
// GetPeers returns the set of peers that support the given protocol.
GetPeers(pid protocol.ID) peer.IDSlice
}

// UpdateFunction is a function that adjusts the GossipSub spam record of a peer.
Expand Down
8 changes: 4 additions & 4 deletions network/p2p/mock/protocol_peer_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 27 additions & 21 deletions network/p2p/node/internal/protocolPeerCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/rs/zerolog"
"golang.org/x/exp/maps"

p2plogging "github.com/onflow/flow-go/network/p2p/logging"
)
Expand All @@ -20,13 +21,26 @@ type ProtocolPeerCache struct {
sync.RWMutex
}

func NewProtocolPeerCache(logger zerolog.Logger, h host.Host) (*ProtocolPeerCache, error) {
// NewProtocolPeerCache creates a new ProtocolPeerCache instance using the given host and supported protocols
// Only protocols passed in the protocols list will be tracked
func NewProtocolPeerCache(logger zerolog.Logger, h host.Host, protocols []protocol.ID) (*ProtocolPeerCache, error) {
protocolPeers := make(map[protocol.ID]map[peer.ID]struct{})
for _, pid := range protocols {
protocolPeers[pid] = make(map[peer.ID]struct{})
}
p := &ProtocolPeerCache{protocolPeers: protocolPeers}

// If no protocols are passed, this is a noop cache
if len(protocols) == 0 {
return p, nil
}

sub, err := h.EventBus().
Subscribe([]interface{}{new(event.EvtPeerIdentificationCompleted), new(event.EvtPeerProtocolsUpdated)})
if err != nil {
return nil, fmt.Errorf("could not subscribe to peer protocol update events: %w", err)
}
p := &ProtocolPeerCache{protocolPeers: make(map[protocol.ID]map[peer.ID]struct{})}

h.Network().Notify(&libp2pnet.NotifyBundle{
DisconnectedF: func(n libp2pnet.Network, c libp2pnet.Conn) {
peer := c.RemotePeer()
Expand All @@ -43,49 +57,41 @@ func NewProtocolPeerCache(logger zerolog.Logger, h host.Host) (*ProtocolPeerCach
func (p *ProtocolPeerCache) RemovePeer(peerID peer.ID) {
p.Lock()
defer p.Unlock()
for pid, peers := range p.protocolPeers {
for _, peers := range p.protocolPeers {
delete(peers, peerID)
if len(peers) == 0 {
delete(p.protocolPeers, pid)
}
}
}

func (p *ProtocolPeerCache) AddProtocols(peerID peer.ID, protocols []protocol.ID) {
p.Lock()
defer p.Unlock()
for _, pid := range protocols {
peers, ok := p.protocolPeers[pid]
if !ok {
peers = make(map[peer.ID]struct{})
p.protocolPeers[pid] = peers
if peers, ok := p.protocolPeers[pid]; ok {
peers[peerID] = struct{}{}
}
peers[peerID] = struct{}{}
}
}

func (p *ProtocolPeerCache) RemoveProtocols(peerID peer.ID, protocols []protocol.ID) {
p.Lock()
defer p.Unlock()
for _, pid := range protocols {
peers := p.protocolPeers[pid]
delete(peers, peerID)
if len(peers) == 0 {
delete(p.protocolPeers, pid)
if peers, ok := p.protocolPeers[pid]; ok {
delete(peers, peerID)
}
}
}

func (p *ProtocolPeerCache) GetPeers(pid protocol.ID) map[peer.ID]struct{} {
func (p *ProtocolPeerCache) GetPeers(pid protocol.ID) peer.IDSlice {
p.RLock()
defer p.RUnlock()

// it is not safe to return a reference to the map, so we make a copy
peersCopy := make(map[peer.ID]struct{}, len(p.protocolPeers[pid]))
for peerID := range p.protocolPeers[pid] {
peersCopy[peerID] = struct{}{}
peers, ok := p.protocolPeers[pid]
if !ok {
return peer.IDSlice{}
}
return peersCopy

return maps.Keys(peers)
}

func (p *ProtocolPeerCache) consumeSubscription(logger zerolog.Logger, h host.Host, sub event.Subscription) {
Expand Down
26 changes: 15 additions & 11 deletions network/p2p/node/internal/protocolPeerCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal_test

import (
"context"
"slices"
"testing"
"time"

Expand All @@ -22,20 +23,22 @@ func TestProtocolPeerCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p1 := protocol.ID("p1")
p2 := protocol.ID("p2")
p3 := protocol.ID("p3")

// create three hosts, and a pcache for the first
// the cache supports all 3
h1, err := p2pbuilder.DefaultLibP2PHost(unittest.DefaultAddress, unittest.KeyFixture(crypto.ECDSASecp256k1))
require.NoError(t, err)
pcache, err := internal.NewProtocolPeerCache(zerolog.Nop(), h1)
pcache, err := internal.NewProtocolPeerCache(zerolog.Nop(), h1, []protocol.ID{p1, p2, p3})
require.NoError(t, err)
h2, err := p2pbuilder.DefaultLibP2PHost(unittest.DefaultAddress, unittest.KeyFixture(crypto.ECDSASecp256k1))
require.NoError(t, err)
h3, err := p2pbuilder.DefaultLibP2PHost(unittest.DefaultAddress, unittest.KeyFixture(crypto.ECDSASecp256k1))
require.NoError(t, err)

// register each host on a separate protocol
p1 := protocol.ID("p1")
p2 := protocol.ID("p2")
p3 := protocol.ID("p3")
noopHandler := func(s network.Stream) {}
h1.SetStreamHandler(p1, noopHandler)
h2.SetStreamHandler(p2, noopHandler)
Expand All @@ -50,8 +53,8 @@ func TestProtocolPeerCache(t *testing.T) {
assert.Eventually(t, func() bool {
peers2 := pcache.GetPeers(p2)
peers3 := pcache.GetPeers(p3)
_, ok2 := peers2[h2.ID()]
_, ok3 := peers3[h3.ID()]
ok2 := slices.Contains(peers2, h2.ID())
ok3 := slices.Contains(peers3, h3.ID())
return len(peers2) == 1 && len(peers3) == 1 && ok2 && ok3
}, 3*time.Second, 50*time.Millisecond)

Expand All @@ -64,15 +67,16 @@ func TestProtocolPeerCache(t *testing.T) {
}, 3*time.Second, 50*time.Millisecond)

// add support for p4 on h2 and h3
// note: pcache does NOT support p4 and should not cache it
p4 := protocol.ID("p4")
h2.SetStreamHandler(p4, noopHandler)
h3.SetStreamHandler(p4, noopHandler)

// check that h1's pcache reflects the change
assert.Eventually(t, func() bool {
// check that h1's pcache never contains p4
assert.Never(t, func() bool {
peers4 := pcache.GetPeers(p4)
_, ok2 := peers4[h2.ID()]
_, ok3 := peers4[h3.ID()]
ok2 := slices.Contains(peers4, h2.ID())
ok3 := slices.Contains(peers4, h3.ID())
return len(peers4) == 2 && ok2 && ok3
}, 3*time.Second, 50*time.Millisecond)
}, 1*time.Second, 50*time.Millisecond)
}
9 changes: 2 additions & 7 deletions network/p2p/node/libp2pNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewNode(cfg *p2p.NodeConfig) (*Node, error) {
return nil, fmt.Errorf("invalid config: %w", err)
}

pCache, err := nodeinternal.NewProtocolPeerCache(cfg.Logger, cfg.Host)
pCache, err := nodeinternal.NewProtocolPeerCache(cfg.Logger, cfg.Host, cfg.ProtocolPeerCacheList)
if err != nil {
return nil, fmt.Errorf("failed to create protocol peer cache: %w", err)
}
Expand Down Expand Up @@ -182,12 +182,7 @@ func (n *Node) RemovePeer(peerID peer.ID) error {

// GetPeersForProtocol returns slice peer IDs for the specified protocol ID.
func (n *Node) GetPeersForProtocol(pid protocol.ID) peer.IDSlice {
pMap := n.pCache.GetPeers(pid)
peers := make(peer.IDSlice, 0, len(pMap))
for p := range pMap {
peers = append(peers, p)
}
return peers
return n.pCache.GetPeers(pid)
}

// OpenAndWriteOnStream opens a new stream to a peer. The stream is opened to the given peerID
Expand Down

0 comments on commit 6bd6cab

Please sign in to comment.