Skip to content

Commit

Permalink
feat: impl filter with transport
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Feb 5, 2025
1 parent 05aa7b6 commit 73881f0
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 42 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ impl NetworkService {
let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
let count = max((config.max_outbound_peers >> 1) as usize, 1);
let mut addrs: Vec<_> = peer_store
.fetch_addrs_to_attempt(count, *target)
.fetch_addrs_to_attempt(count, *target, |_| true)
.into_iter()
.map(|paddr| paddr.addr)
.collect();
Expand Down
29 changes: 21 additions & 8 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,15 @@ impl PeerStore {
}

/// Get peers for outbound connection, this method randomly return recently connected peer addrs
pub fn fetch_addrs_to_attempt(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
pub fn fetch_addrs_to_attempt<F>(
&mut self,
count: usize,
required_flags: Flags,
filter: F,
) -> Vec<AddrInfo>
where
F: Fn(&AddrInfo) -> bool,
{
// Get info:
// 1. Not already connected
// 2. Connected within 3 days
Expand All @@ -167,9 +175,10 @@ impl PeerStore {
let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);

let filter = |peer_addr: &AddrInfo| {
extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
filter(peer_addr)
&& extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
&& peer_addr
.connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL))
&& required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
Expand All @@ -181,7 +190,10 @@ impl PeerStore {

/// Get peers for feeler connection, this method randomly return peer addrs that we never
/// connected to.
pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec<AddrInfo> {
pub fn fetch_addrs_to_feeler<F>(&mut self, count: usize, filter: F) -> Vec<AddrInfo>
where
F: Fn(&AddrInfo) -> bool,
{
// Get info:
// 1. Not already connected
// 2. Not already tried in a minute
Expand All @@ -192,9 +204,10 @@ impl PeerStore {
let peers = &self.connected_peers;

let filter = |peer_addr: &AddrInfo| {
extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
filter(peer_addr)
&& extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
&& !peer_addr.tried_in_last_minute(now_ms)
&& !peer_addr.connected(|t| t > addr_expired_ms)
};
Expand Down
2 changes: 1 addition & 1 deletion network/src/protocols/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ fn test_discovery_behavior() {
let mut locked = node1.network_state.peer_store.lock();

locked
.fetch_addrs_to_feeler(6)
.fetch_addrs_to_feeler(6, |_| true)
.into_iter()
.map(|peer| peer.addr)
.flat_map(|addr| {
Expand Down
35 changes: 33 additions & 2 deletions network/src/services/outbound_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,23 @@ impl OutboundPeerService {

fn dial_feeler(&mut self) {
let now_ms = unix_time_as_millis();
let filter: Box<dyn Fn(&AddrInfo) -> bool> = match self.transport_type {
TransportType::Tcp => Box::new(|_| true),
TransportType::Ws => Box::new(|peer_addr: &AddrInfo| {
peer_addr
.addr
.iter()
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Tcp(_)))
}),
TransportType::Wss => Box::new(|peer_addr: &AddrInfo| {
peer_addr
.addr
.iter()
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_)))
}),
};
let attempt_peers = self.network_state.with_peer_store_mut(|peer_store| {
let paddrs = peer_store.fetch_addrs_to_feeler(FEELER_CONNECTION_COUNT);
let paddrs = peer_store.fetch_addrs_to_feeler(FEELER_CONNECTION_COUNT, filter);
for paddr in paddrs.iter() {
// mark addr as tried
if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) {
Expand Down Expand Up @@ -97,8 +112,24 @@ impl OutboundPeerService {

let target = &self.network_state.required_flags;

let filter: Box<dyn Fn(&AddrInfo) -> bool> = match self.transport_type {
TransportType::Tcp => Box::new(|_| true),
TransportType::Ws => Box::new(|peer_addr: &AddrInfo| {
peer_addr
.addr
.iter()
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Tcp(_)))
}),
TransportType::Wss => Box::new(|peer_addr: &AddrInfo| {
peer_addr
.addr
.iter()
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_)))
}),
};

let f = |peer_store: &mut PeerStore, number: usize, now_ms: u64| -> Vec<AddrInfo> {
let paddrs = peer_store.fetch_addrs_to_attempt(number, *target);
let paddrs = peer_store.fetch_addrs_to_attempt(number, *target, filter);
for paddr in paddrs.iter() {
// mark addr as tried
if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) {
Expand Down
52 changes: 26 additions & 26 deletions network/src/tests/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ fn test_add_addr() {
let mut peer_store: PeerStore = Default::default();
assert_eq!(
peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.len(),
0
);
let addr = random_addr();
peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap();
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
// we have not connected yet, so return 0
assert_eq!(
peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.len(),
0
);
Expand Down Expand Up @@ -141,14 +141,14 @@ fn test_attempt_ban() {

assert_eq!(
peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.len(),
1
);
peer_store.ban_addr(&addr, 10_000, "no reason".into());
assert_eq!(
peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.len(),
0
);
Expand All @@ -161,7 +161,7 @@ fn test_fetch_addrs_to_attempt() {

let mut peer_store: PeerStore = Default::default();
assert!(peer_store
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
.is_empty());
let addr = random_addr();
peer_store
Expand All @@ -176,13 +176,13 @@ fn test_fetch_addrs_to_attempt() {

assert_eq!(
peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.len(),
1
);
peer_store.add_connected_peer(addr, SessionType::Outbound);
assert!(peer_store
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
.is_empty());
}

Expand All @@ -199,18 +199,18 @@ fn test_fetch_addrs_to_attempt_or_feeler() {

assert_eq!(
peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.len(),
1
);
assert!(peer_store.fetch_addrs_to_feeler(2).is_empty());
assert!(peer_store.fetch_addrs_to_feeler(2, |_| true).is_empty());

_faketime_guard.set_faketime(100_000 + ADDR_TRY_TIMEOUT_MS + 1);

assert!(peer_store
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(2, Flags::COMPATIBILITY, |_| true)
.is_empty());
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
}

#[test]
Expand All @@ -229,14 +229,14 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
paddr.mark_tried(now);
}
assert!(peer_store
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
.is_empty());
// after 60 seconds
if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) {
paddr.mark_tried(now - 60_001);
}
assert!(peer_store
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
.is_empty());
peer_store
.mut_addr_manager()
Expand All @@ -247,7 +247,7 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {

assert_eq!(
peer_store
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
.len(),
1
);
Expand All @@ -256,7 +256,7 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
}
assert_eq!(
peer_store
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY)
.fetch_addrs_to_attempt(1, Flags::COMPATIBILITY, |_| true)
.len(),
1
);
Expand All @@ -265,18 +265,18 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() {
#[test]
fn test_fetch_addrs_to_feeler() {
let mut peer_store: PeerStore = Default::default();
assert!(peer_store.fetch_addrs_to_feeler(1).is_empty());
assert!(peer_store.fetch_addrs_to_feeler(1, |_| true).is_empty());
let addr = random_addr();

// add an addr
peer_store
.add_addr(addr.clone(), Flags::COMPATIBILITY)
.unwrap();
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);

// ignores connected peers' addrs
peer_store.add_connected_peer(addr.clone(), SessionType::Outbound);
assert!(peer_store.fetch_addrs_to_feeler(1).is_empty());
assert!(peer_store.fetch_addrs_to_feeler(1, |_| true).is_empty());

// peer does not need feeler if it connected to us recently
peer_store
Expand All @@ -285,7 +285,7 @@ fn test_fetch_addrs_to_feeler() {
.unwrap()
.last_connected_at_ms = ckb_systemtime::unix_time_as_millis();
peer_store.remove_disconnected_peer(&addr);
assert!(peer_store.fetch_addrs_to_feeler(1).is_empty());
assert!(peer_store.fetch_addrs_to_feeler(1, |_| true).is_empty());
}

#[test]
Expand Down Expand Up @@ -581,10 +581,10 @@ fn test_addr_unique() {
.unwrap();
peer_store.add_addr(addr_1, Flags::COMPATIBILITY).unwrap();
assert_eq!(peer_store.addr_manager().addrs_iter().count(), 2);
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 2);
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 2);

peer_store.add_addr(addr, Flags::COMPATIBILITY).unwrap();
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 2);
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 2);

assert_eq!(peer_store.addr_manager().addrs_iter().count(), 2);
}
Expand All @@ -597,8 +597,8 @@ fn test_only_tcp_store() {
peer_store
.add_addr(addr.clone(), Flags::COMPATIBILITY)
.unwrap();
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, {
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(1, |_| true)[0].addr, {
addr.pop();
addr
});
Expand All @@ -618,6 +618,6 @@ fn test_support_dns_store() {
peer_store
.add_addr(addr.clone(), Flags::COMPATIBILITY)
.unwrap();
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, addr);
assert_eq!(peer_store.fetch_addrs_to_feeler(2, |_| true).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(1, |_| true)[0].addr, addr);
}

0 comments on commit 73881f0

Please sign in to comment.