Skip to content

Commit

Permalink
improve peer sync by better gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Feb 27, 2025
1 parent ad41c59 commit 2c9d202
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl StatsCollector {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b/o) {}/{}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(tot/gr/bl/non) {}/{}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64())
Expand Down
48 changes: 21 additions & 27 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl Default for Config {
Self {
external_addr: None,
seed_peers: vec![],
peer_info_publish_interval: Duration::from_secs(60 * 15),
peer_info_publish_interval: Duration::from_secs(60 * 5),
stable_peer: true,
private_key_folder: PathBuf::from("."),
private_key: None,
Expand Down Expand Up @@ -525,7 +525,7 @@ where S: ShareChain
topic if topic == Self::network_topic(PEER_INFO_TOPIC) => {
match messages::PeerInfo::try_from(message) {
Ok(payload) => {
debug!(target: LOG_TARGET, "[squad] New peer info: {source_peer:?} -> {payload:?}");
debug!(target: LOG_TARGET, "[PEER_INFO_TOPIC] New peer info: {source_peer:?} -> {payload:?}");
if payload.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", source_peer);
return Ok(MessageAcceptance::Reject);
Expand All @@ -537,18 +537,8 @@ where S: ShareChain
// TODO: should be punish
return Ok(MessageAcceptance::Ignore);
}
if payload.peer_id.as_ref() == Some(self.swarm.local_peer_id()) {
return Ok(MessageAcceptance::Ignore);
}

if payload.peer_id != Some(source_peer) {
warn!(target: LOG_TARGET, "Peer {} sent a peer info message with a different peer id: {}, skipping", source_peer, payload.peer_id.as_ref().map(|p| p.to_string()).unwrap_or("None".to_string()));
// return Ok(MessageAcceptance::Ignore);
}
debug!(target: PEER_INFO_LOGGING_LOG_TARGET, "[SQUAD_PEERINFO_TOPIC] New peer info: {source_peer:?} -> {payload:?}");

self.add_peer(payload, source_peer).await;
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&source_peer);
return Ok(MessageAcceptance::Accept);
},
Err(error) => {
Expand Down Expand Up @@ -731,19 +721,11 @@ where S: ShareChain
return false;
}

// if payload.squad != self.squad {
// debug!(target: LOG_TARGET, "Peer {} is not in the same squad, skipping", peer);
// return false;
// }

let public_addresses = payload.public_addresses();
let add_status = self.network_peer_store.write().await.add(peer, payload).await;

match add_status {
AddPeerStatus::NewPeer => {
// self.initiate_direct_peer_exchange(&peer).await;
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
// let _unused = self.swarm.dial(peer);
for addr in &public_addresses {
self.swarm.add_peer_address(peer, addr.clone());
}
Expand All @@ -752,15 +734,18 @@ where S: ShareChain
},
AddPeerStatus::Existing => {
trace!(target: LOG_TARGET, "Peer was already added");
return true;
},
AddPeerStatus::Greylisted => {
debug!(target: LOG_TARGET, "Added peer but it was grey listed");
},
AddPeerStatus::Blacklisted => {
debug!(target: LOG_TARGET, "Added peer {} but it was black listed", peer);
return true;
},
AddPeerStatus::NonSquad => {
debug!(target: LOG_TARGET, "Added peer {} but it was not in the same squad", peer);
return true;
},
AddPeerStatus::Blacklisted => {
debug!(target: LOG_TARGET, "Added peer {} but it was black listed", peer);
},
}

Expand Down Expand Up @@ -1375,6 +1360,10 @@ where S: ShareChain
DialError::Transport(transport_error) => {
// There are a lot of cancelled errors, so ignore them
warn!(target: LOG_TARGET, "Outgoing connection error, ignoring: {peer_id:?} -> {transport_error:?}");
// self.network_peer_store
// .write()
// .await
// .move_to_grey_list(peer_id, format!("Outgoing connection error: {:?}", transport_error));
},
_ => {
warn!(target: LOG_TARGET, "Outgoing connection error: {peer_id:?} -> {error:?}");
Expand Down Expand Up @@ -1436,7 +1425,7 @@ where S: ShareChain
}
},
ServerNetworkBehaviourEvent::MetaDataExchange(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Event::Message { peer, message } => match message {
request_response::Message::Request {
request_id: _request_id,
request,
Expand All @@ -1452,7 +1441,7 @@ where S: ShareChain
self.handle_meta_data_exchange_response(response).await;
},
Err(error) => {
error!(target: LOG_TARGET, "REQ-RES peer info response error: {error:?}");
error!(target: LOG_TARGET, "REQ-RES peer: {peer} info response error: {error:?}");
},
},
},
Expand All @@ -1466,7 +1455,7 @@ where S: ShareChain
request_response::Event::ResponseSent { .. } => {},
},
ServerNetworkBehaviourEvent::DirectPeerExchange(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Event::Message { peer, message } => match message {
request_response::Message::Request {
request_id: _request_id,
request,
Expand All @@ -1482,7 +1471,7 @@ where S: ShareChain
self.handle_direct_peer_exchange_response(response).await;
},
Err(error) => {
error!(target: LOG_TARGET, "REQ-RES peer info response error: {error:?}");
error!(target: LOG_TARGET, "REQ-RES peer: {peer} info response error: {error:?}");
},
},
},
Expand Down Expand Up @@ -2504,6 +2493,11 @@ where S: ShareChain
}
},
event = self.swarm.select_next_some() => {
let public_addresses: Vec<Multiaddr> = self.swarm.external_addresses().cloned().collect();
warn!(target: LOG_TARGET, "We have this many externaled address: {:?}", public_addresses.len());
for addr in public_addresses {
warn!(target: LOG_TARGET, "External address: {}", addr);
}
let timer = Instant::now();
self.handle_event(event).await;
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
Expand Down
36 changes: 15 additions & 21 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl PeerStoreRecord {
}
}

#[derive(PartialEq)]
pub enum AddPeerStatus {
NewPeer,
Existing,
Expand Down Expand Up @@ -206,11 +207,6 @@ impl PeerStore {
}

pub fn best_peers_to_share(&self, count: usize, squad: &str, other_nodes_peers: &[PeerId]) -> Vec<PeerStoreRecord> {
// let mut peers = if squad == self.my_squad {
// self.whitelist_peers.values().collect::<Vec<_>>()
// } else {
// self.non_squad_peers.values().collect::<Vec<_>>()
// };
let mut peers = self.whitelist_peers.values().collect::<Vec<_>>();
peers.extend(self.non_squad_peers.values().collect::<Vec<_>>());
peers.retain(|peer| !peer.peer_info.public_addresses().is_empty() && peer.last_ping.is_some());
Expand Down Expand Up @@ -309,20 +305,23 @@ impl PeerStore {
return AddPeerStatus::Greylisted;
}

if self.non_squad_peers.contains_key(&peer_id.to_base58()) {
return AddPeerStatus::Existing;
}

if peer_info.squad != self.my_squad {
let return_type = if self.non_squad_peers.contains_key(&peer_id.to_base58()) {
AddPeerStatus::NonSquad
} else {
AddPeerStatus::Existing
};
self.non_squad_peers
.insert(peer_id.to_base58(), PeerStoreRecord::new(peer_id, peer_info));
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
return AddPeerStatus::NonSquad;
if return_type == AddPeerStatus::NonSquad {
let _unused = self.stats_broadcast_client.send_new_peer(
self.whitelist_peers.len() as u64,
self.greylist_peers.len() as u64,
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);
}
return return_type;
}

if let Some(entry) = self.whitelist_peers.get_mut(&peer_id.to_base58()) {
Expand All @@ -349,11 +348,6 @@ impl PeerStore {
self.blacklist_peers.len() as u64,
self.non_squad_peers.len() as u64,
);

// self.peer_removals.insert(peer_id, removal_count).await;
// }

// self.set_tip_of_block_heights().await;
AddPeerStatus::NewPeer
}

Expand Down

0 comments on commit 2c9d202

Please sign in to comment.