Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer store optimization #4783

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion benches/benches/benchmarks/overall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController};
use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::JsonBytes;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_types::{
Expand Down Expand Up @@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
3 changes: 2 additions & 1 deletion chain/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig};
use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_test_chain_utils::{always_success_cell, create_always_success_tx};
Expand Down Expand Up @@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
75 changes: 47 additions & 28 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,14 @@ impl NetworkState {
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
.filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
_ => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
Expand Down Expand Up @@ -158,15 +157,14 @@ impl NetworkState {
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
.filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
_ => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
Expand Down Expand Up @@ -831,6 +829,7 @@ impl NetworkService {
required_protocol_ids: Vec<ProtocolId>,
// name, version, flags
identify_announce: (String, String, Flags),
transport_type: TransportType,
) -> Self {
let config = &network_state.config;

Expand Down Expand Up @@ -1017,7 +1016,7 @@ impl NetworkService {
service_builder = service_builder.tcp_config(bind_fn);
}
}
TransportType::Ws => {
TransportType::Ws | TransportType::Wss => {
// only bind once
if matches!(init, BindType::Ws) {
continue;
Expand Down Expand Up @@ -1074,6 +1073,7 @@ impl NetworkService {
Arc::clone(&network_state),
p2p_service.control().to_owned().into(),
Duration::from_secs(config.connect_outbound_interval_secs),
transport_type,
);
bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
};
Expand Down Expand Up @@ -1520,19 +1520,38 @@ pub(crate) async fn async_disconnect_with_message(
control.disconnect(peer_index).await
}

/// Transport type on ckb
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub(crate) enum TransportType {
pub enum TransportType {
/// Tcp
Tcp,
/// Ws
Ws,
/// Wss only on wasm
Wss,
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
if addr
.iter()
.any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss))
{
TransportType::Ws
} else {
TransportType::Tcp
impl<'a> From<TransportType> for p2p::multiaddr::Protocol<'a> {
fn from(value: TransportType) -> Self {
match value {
TransportType::Ws => Protocol::Ws,
TransportType::Wss => Protocol::Wss,
_ => unreachable!(),
driftluo marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
let mut iter = addr.iter();

iter.find_map(|proto| {
if let Protocol::Ws = proto {
Some(TransportType::Ws)
} else if let Protocol::Wss = proto {
Some(TransportType::Wss)
} else {
None
}
})
.unwrap_or(TransportType::Tcp)
driftluo marked this conversation as resolved.
Show resolved Hide resolved
}
110 changes: 50 additions & 60 deletions network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,40 @@
//! Address manager
#[cfg(target_family = "wasm")]
use crate::network::{find_type, TransportType};
use crate::peer_store::types::AddrInfo;
use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr};
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;

/// Address manager
#[derive(Default)]
pub struct AddrManager {
next_id: u64,
addr_to_id: HashMap<SocketAddr, u64>,
addr_to_id: HashMap<Multiaddr, u64>,
id_to_info: HashMap<u64, AddrInfo>,
random_ids: Vec<u64>,
}

impl AddrManager {
/// Add an address information to address manager
pub fn add(&mut self, mut addr_info: AddrInfo) {
if let Some(key) = multiaddr_to_socketaddr(&addr_info.addr) {
if let Some(&id) = self.addr_to_id.get(&key) {
let (exist_last_connected_at_ms, random_id_pos) = {
let info = self.id_to_info.get(&id).expect("must exists");
(info.last_connected_at_ms, info.random_id_pos)
};
// Get time earlier than record time, return directly
if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
addr_info.random_id_pos = random_id_pos;
self.id_to_info.insert(id, addr_info);
}
return;
if let Some(&id) = self.addr_to_id.get(&addr_info.addr) {
let (exist_last_connected_at_ms, random_id_pos) = {
let info = self.id_to_info.get(&id).expect("must exists");
(info.last_connected_at_ms, info.random_id_pos)
};
// Get time earlier than record time, return directly
if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
addr_info.random_id_pos = random_id_pos;
self.id_to_info.insert(id, addr_info);
}

let id = self.next_id;
self.addr_to_id.insert(key, id);
addr_info.random_id_pos = self.random_ids.len();
self.id_to_info.insert(id, addr_info);
self.random_ids.push(id);
self.next_id += 1;
return;
}

let id = self.next_id;
self.addr_to_id.insert(addr_info.addr.clone(), id);
addr_info.random_id_pos = self.random_ids.len();
self.id_to_info.insert(id, addr_info);
self.random_ids.push(id);
self.next_id += 1;
}

/// Randomly return addrs that worth to try or connect.
Expand All @@ -51,33 +46,36 @@ impl AddrManager {
let mut addr_infos = Vec::with_capacity(count);
let mut rng = rand::thread_rng();
let now_ms = ckb_systemtime::unix_time_as_millis();
#[cfg(target_family = "wasm")]
let filter = |peer_addr: &AddrInfo| {
filter(peer_addr) && matches!(find_type(&peer_addr.addr), TransportType::Ws)
};
for i in 0..self.random_ids.len() {
// reuse the for loop to shuffle random ids
// https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
let j = rng.gen_range(i..self.random_ids.len());
self.swap_random_id(j, i);
let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned();
if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) {
let ip = socket_addr.ip();
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
match multiaddr_to_socketaddr(&addr_info.addr) {
Some(socket_addr) => {
let ip = socket_addr.ip();
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
}
}
if addr_infos.len() == count {
break;
None => {
if addr_info.is_connectable(now_ms) && filter(&addr_info) {
addr_infos.push(addr_info);
}
}
}
if addr_infos.len() == count {
break;
}
}
addr_infos
}
Expand All @@ -94,34 +92,26 @@ impl AddrManager {

/// Remove an address by ip and port
pub fn remove(&mut self, addr: &Multiaddr) -> Option<AddrInfo> {
multiaddr_to_socketaddr(addr).and_then(|addr| {
self.addr_to_id.remove(&addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
self.random_ids.pop();
self.id_to_info.remove(&id)
})
self.addr_to_id.remove(addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
self.random_ids.pop();
self.id_to_info.remove(&id)
})
}

/// Get an address information by ip and port
pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> {
multiaddr_to_socketaddr(addr).and_then(|addr| {
self.addr_to_id
.get(&addr)
.and_then(|id| self.id_to_info.get(id))
})
self.addr_to_id
.get(addr)
.and_then(|id| self.id_to_info.get(id))
}

/// Get a mutable address information by ip and port
pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> {
if let Some(addr) = multiaddr_to_socketaddr(addr) {
if let Some(id) = self.addr_to_id.get(&addr) {
self.id_to_info.get_mut(id)
} else {
None
}
if let Some(id) = self.addr_to_id.get(addr) {
self.id_to_info.get_mut(id)
} else {
None
}
Expand Down
11 changes: 0 additions & 11 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::network::{find_type, TransportType};
use crate::{
errors::{PeerStoreError, Result},
extract_peer_id, multiaddr_to_socketaddr,
Expand Down Expand Up @@ -65,10 +64,6 @@ impl PeerStore {
if self.ban_list.is_addr_banned(&addr) {
return Ok(());
}
#[cfg(target_family = "wasm")]
if !matches!(find_type(&addr), TransportType::Ws) {
return Ok(());
}
self.check_purge()?;
let score = self.score_config.default_score;
self.addr_manager
Expand Down Expand Up @@ -180,12 +175,6 @@ impl PeerStore {
&& required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
};

// Any protocol expect websocket
#[cfg(not(target_family = "wasm"))]
let filter = |peer_addr: &AddrInfo| {
filter(peer_addr) && !matches!(find_type(&peer_addr.addr), TransportType::Ws)
};

// get addrs that can attempt.
self.addr_manager.fetch_random(count, filter)
}
Expand Down
15 changes: 14 additions & 1 deletion network/src/peer_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,20 @@ impl AddrInfo {
/// Init
pub fn new(addr: Multiaddr, last_connected_at_ms: u64, score: Score, flags: u64) -> Self {
AddrInfo {
addr,
// only store tcp protocol
addr: addr
.iter()
.filter_map(|p| {
if matches!(
p,
Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_)
) {
None
} else {
Some(p)
}
})
.collect(),
score,
last_connected_at_ms,
last_tried_at_ms: 0,
Expand Down
8 changes: 4 additions & 4 deletions network/src/protocols/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ impl AddressManager for DiscoveryAddressManager {

fn is_valid_addr(&self, addr: &Multiaddr) -> bool {
if !self.discovery_local_address {
let local_or_invalid = multiaddr_to_socketaddr(addr)
.map(|socket_addr| !is_reachable(socket_addr.ip()))
.unwrap_or(true);
!local_or_invalid
match multiaddr_to_socketaddr(addr) {
Some(socket_addr) => is_reachable(socket_addr.ip()),
None => true,
}
} else {
true
}
Expand Down
7 changes: 3 additions & 4 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,9 @@ impl<T: Callback> IdentifyProtocol<T> {
let global_ip_only = self.global_ip_only;
let reachable_addrs = listens
.into_iter()
.filter(|addr| {
multiaddr_to_socketaddr(addr)
.map(|socket_addr| !global_ip_only || is_reachable(socket_addr.ip()))
.unwrap_or(false)
.filter(|addr| match multiaddr_to_socketaddr(addr) {
Some(socket_addr) => !global_ip_only || is_reachable(socket_addr.ip()),
None => true,
})
.collect::<Vec<_>>();
self.callback
Expand Down
Loading
Loading