Skip to content

Commit

Permalink
fix: dial less often
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Dec 30, 2024
1 parent 7ddb54d commit c875aa0
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 52 deletions.
92 changes: 53 additions & 39 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ use libp2p::{
ping,
relay,
request_response::{self, cbor, OutboundFailure, OutboundRequestId, ResponseChannel},
swarm::{
behaviour::toggle::Toggle,
dial_opts::{DialOpts, PeerCondition},
DialError,
NetworkBehaviour,
SwarmEvent,
},
swarm::{behaviour::toggle::Toggle, dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent},
Multiaddr,
PeerId,
Swarm,
Expand Down Expand Up @@ -756,9 +750,9 @@ where S: ShareChain

match add_status {
AddPeerStatus::NewPeer => {
self.initiate_direct_peer_exchange(&peer).await;
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
let _unused = self.swarm.dial(peer);
// self.initiate_direct_peer_exchange(&peer).await;
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
// let _unused = self.swarm.dial(peer);
return true;
},
AddPeerStatus::Existing => {},
Expand All @@ -781,6 +775,7 @@ where S: ShareChain
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info: {error:?}");
})
{
dbg!("here");
let local_peer_id = *self.swarm.local_peer_id();
if peer == &local_peer_id {
return;
Expand Down Expand Up @@ -1187,7 +1182,7 @@ where S: ShareChain
info!(target: LOG_TARGET, squad = &self.config.squad; "Connection established: {peer_id:?} -> {endpoint:?} ({num_established:?}/{concurrent_dial_errors:?}/{established_in:?})");
},
SwarmEvent::Dialing { peer_id, .. } => {
info!(target: LOG_TARGET, squad = &self.config.squad; "Dialing: {peer_id:?}");
info!(target: LOG_TARGET, "Dialing: {peer_id:?}");
},
SwarmEvent::NewListenAddr { address, .. } => {
info!(target: LOG_TARGET, squad = &self.config.squad; "Listening on {address:?}");
Expand All @@ -1203,14 +1198,15 @@ where S: ShareChain
if !endpoint.is_dialer() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Connection closed: {peer_id:?} -> {endpoint:?} ({num_established:?}) -> {cause:?}");
}
warn!(target: LOG_TARGET, squad = &self.config.squad; "Connection closed: {peer_id:?} -> {endpoint:?} ({num_established:?}) -> {cause:?}");
},
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error,
} => {
info!(target: LOG_TARGET, squad = &self.config.squad; "Incoming connection error: {connection_id:?} -> {local_addr:?} -> {send_back_addr:?} -> {error:?}");
info!(target: LOG_TARGET, "Incoming connection error: {connection_id:?} -> {local_addr:?} -> {send_back_addr:?} -> {error:?}");
},
SwarmEvent::ListenerError { listener_id, error } => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Listener error: {listener_id:?} -> {error:?}");
Expand All @@ -1227,7 +1223,7 @@ where S: ShareChain
match error {
DialError::Transport(transport_error) => {
// There are a lot of cancelled errors, so ignore them
debug!(target: LOG_TARGET, "Outgoing connection error, ignoring: {peer_id:?} -> {transport_error:?}");
warn!(target: LOG_TARGET, "Outgoing connection error, ignoring: {peer_id:?} -> {transport_error:?}");
},
_ => {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Outgoing connection error: {peer_id:?} -> {error:?}");
Expand Down Expand Up @@ -1966,15 +1962,15 @@ where S: ShareChain
addresses.truncate(8);

// Try dial, this should already be happening though
if let Err(err) = self.swarm.dial(
DialOpts::peer_id(relay.peer_id)
.addresses(relay.addresses.clone())
// .condition(PeerCondition::NotDialing)
.build(),
) {
debug!(target: LOG_TARGET, "🚨 Failed to dial relay: {}", err);
// return;
}
// if let Err(err) = self.swarm.dial(
// DialOpts::peer_id(relay.peer_id)
// .addresses(relay.addresses.clone())
// // .condition(PeerCondition::NotDialing)
// .build(),
// ) {
// debug!(target: LOG_TARGET, "🚨 Failed to dial relay: {}", err);
// // return;
// }

addresses.iter().for_each(|addr| {
let listen_addr = addr.clone().with(Protocol::P2pCircuit);
Expand Down Expand Up @@ -2172,7 +2168,7 @@ where S: ShareChain
let mut connection_stats_publish = tokio::time::interval(Duration::from_secs(10));
connection_stats_publish.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut seek_connections_interval = tokio::time::interval(Duration::from_secs(5));
let mut seek_connections_interval = tokio::time::interval(Duration::from_secs(20));
seek_connections_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut debug_chain_graph = if self.config.debug_print_chain {
Expand All @@ -2191,6 +2187,7 @@ where S: ShareChain
tokio::pin!(connection_stats_publish);
tokio::pin!(seek_connections_interval);

let uptime = Instant::now();
loop {
// info!(target: LOG_TARGET, "P2P service main loop iter");
select! {
Expand Down Expand Up @@ -2220,11 +2217,13 @@ where S: ShareChain
let info = self.swarm.network_info();
let counters = info.connection_counters();

let num_connections = counters.num_established_incoming() + counters.num_established_outgoing();
if num_connections > 20 {
// let num_connections = counters.num_established_incoming() + counters.num_established_outgoing();
let num_connections = counters.num_established_outgoing();
if num_connections > 8 {
continue;
}
if num_connections == 0 {
if num_connections == 0 && uptime.elapsed() < Duration::from_secs(60) {

match self.dial_seed_peers().await {
Ok(_) => {},
Err(e) => {
Expand All @@ -2236,26 +2235,44 @@ where S: ShareChain

let mut num_dialed = 0;
let mut store_write_lock = self.network_peer_store.write().await;
// Rather try and search good peers rather than randomly dialing
// 1000 peers will take a long time to get through

let mut peers_to_dial = vec![];
for record in store_write_lock.best_peers_to_dial(100) {
debug!(target: LOG_TARGET, "Dialing peer: {:?} with height(rx/sha) {}/{}", record.peer_id, record.peer_info.current_random_x_height, record.peer_info.current_sha3x_height);
// dbg!(&record.peer_id);
// Only dial seed peers if we have 0 connections
if !self.swarm.is_connected(&record.peer_id)
&& !store_write_lock.is_seed_peer(&record.peer_id) {
// if &record.peer_id.to_string() != "12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS" {
// store_write_lock.update_last_dial_attempt(&record.peer_id);
// info!(target: LOG_TARGET, "Skipping dialing peer: {:?} with height(rx/sha) {}/{} on {}", record.peer_id, record.peer_info.current_random_x_height, record.peer_info.current_sha3x_height, record.peer_info.public_addresses().iter().map(|a| a.to_string()).collect::<Vec<String>>().join(", "));
// continue;
// }
store_write_lock.update_last_dial_attempt(&record.peer_id);
let dial_opts= DialOpts::peer_id(record.peer_id).condition(PeerCondition::Always).addresses(record.peer_info.public_addresses().clone()).extend_addresses_through_behaviour().build();
let _unused = self.swarm.dial(dial_opts);
info!(target: LOG_TARGET, "Dialing peer: {:?} with height(rx/sha) {}/{} on {}", record.peer_id, record.peer_info.current_random_x_height, record.peer_info.current_sha3x_height, record.peer_info.public_addresses().iter().map(|a| a.to_string()).collect::<Vec<String>>().join(", "));
let dial_opts= DialOpts::peer_id(record.peer_id).addresses(record.peer_info.public_addresses().clone()).extend_addresses_through_behaviour().build();
// let dial_opts= DialOpts::peer_id(record.peer_id).addresses(vec!["/ip4/152.228.210.16/tcp/19001/p2p/12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS".parse().unwrap(), "/ip4/152.228.210.16/udp/19001/quic-v1/p2p/12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS".parse().unwrap()]).build();
// let dial_opts = DialOpts::unknown_peer_id().address("/ip4/152.228.210.16/tcp/19001/p2p/12D3KooWD6GY3c8cz6AwKaDaqmqGCbmewhjKT5ULN9JUB5oUgWjS".parse().unwrap()).build();
let _unused = self.swarm.dial(dial_opts).map_err(|e| {
warn!(target: LOG_TARGET, "Failed to dial peer: {e:?}");
});
// self.initiate_direct_peer_exchange(&record.peer_id).await;
peers_to_dial.push(record.peer_id);
num_dialed += 1;
// We can only do 30 connections
// after 30 it starts cancelling dials
if num_dialed > 80 {
if num_dialed > 10 {
break;
}
}


}
}
drop(store_write_lock);
// for peer in peers_to_dial {
// dbg!("trying");
// self.initiate_direct_peer_exchange(&peer).await;
// }
}
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
warn!(target: LOG_TARGET, "Seeking connections took too long: {:?}", timer.elapsed());
}
Expand Down Expand Up @@ -2514,12 +2531,9 @@ where S: ShareChain
// self.swarm.behaviour_mut().kademlia.add_address(peer_id, addr.clone());
self.swarm.add_peer_address(*peer_id, addr.clone());
peers_to_add.push(*peer_id);
let _unused = self
.swarm
.dial(DialOpts::peer_id(*peer_id).condition(PeerCondition::Always).build())
.inspect_err(|e| {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to dial seed peer: {e:?}");
});
let _unused = self.swarm.dial(DialOpts::peer_id(*peer_id).build()).inspect_err(|e| {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to dial seed peer: {e:?}");
});
});
self.network_peer_store.write().await.add_seed_peers(peers_to_add);

Expand Down
2 changes: 1 addition & 1 deletion src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl PeerStore {
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
peers.retain(|peer| {
!peer.peer_info.public_addresses().is_empty() &&
(peer.last_dial_attempt.is_none() || peer.last_dial_attempt.unwrap().elapsed().as_secs() > 60)
(peer.last_dial_attempt.is_none() || peer.last_dial_attempt.unwrap().elapsed().as_secs() > 120)
});
peers.sort_by(|a, b| {
b.num_grey_listings
Expand Down
6 changes: 3 additions & 3 deletions src/server/p2p/relay_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ impl RelayStore {
}

pub fn select_random_relay(&mut self) {
let Some((peer, addrs)) = self.possible_relays.iter().choose(&mut rand::thread_rng()) else {
let Some((_peer, addrs)) = self.possible_relays.iter().choose(&mut rand::thread_rng()) else {
return;
};
self.selected_relay = Some(RelayPeer {
peer_id: *peer,
// peer_id: *peer,
addresses: addrs.iter().cloned().collect(),
is_circuit_established: false,
});
Expand All @@ -35,7 +35,7 @@ impl RelayStore {

#[derive(Debug, Clone)]
pub struct RelayPeer {
pub peer_id: PeerId,
// pub peer_id: PeerId,
pub addresses: Vec<Multiaddr>,
pub is_circuit_established: bool,
}
3 changes: 1 addition & 2 deletions src/server/p2p/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result<Swarm<ServerNet
let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair(&config.p2p_service).await?)
.with_tokio()
.with_tcp(tcp::Config::default().nodelay(true), // Nodelay helps with hole punching
noise::Config::new, yamux::Config::default)
?
noise::Config::new, yamux::Config::default)?
.with_quic_config(|mut config| {
config.handshake_timeout = Duration::from_secs(30);
config
Expand Down
17 changes: 10 additions & 7 deletions src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ impl<T: BlockCache> P2Chain<T> {
let mut new_chain = Self::new_empty(algo, total_size, share_window, block_time, new_block_cache);
for block in from_block_cache.all_blocks()? {
info!(target: LOG_TARGET, "Loading block {}({:x}{:x}{:x}{:x}) into chain", block.height, block.hash[0], block.hash[1], block.hash[2], block.hash[3]);
new_chain.add_block_to_chain(block)?;
let _ = new_chain.add_block_to_chain(block).inspect_err(|e| {
error!(target: LOG_TARGET, "Failed to load block into chain: {}", e);
});
}
Ok(new_chain)
}
Expand Down Expand Up @@ -466,12 +468,13 @@ impl<T: BlockCache> P2Chain<T> {
let nextblock = parent_level.get_header(&current_block.prev_hash);
if nextblock.is_none() {
error!(target: LOG_TARGET, "FATAL: Reorging (block in chain) failed because parent block was not found and chain data is corrupted.");
panic!(
"FATAL: Reorging (block in chain) failed because parent block was not found and chain \
data is corrupted. current_block: {:?}, current tip: {:?}",
current_block.height,
self.get_tip().map(|t| t.height())
);
return Err(ShareChainError::BlockNotFound);
// panic!(
// "FATAL: Reorging (block in chain) failed because parent block was not found and chain
// \ data is corrupted. current_block: {:?}, current tip:
// {:?}", current_block.height,
// self.get_tip().map(|t| t.height())
// );
}
// fix the main chain
let mut_parent_level = self.level_at_height(current_block.height.saturating_sub(1)).unwrap();
Expand Down

0 comments on commit c875aa0

Please sign in to comment.