Skip to content

Commit

Permalink
feat!: split metadata and peer exchange (#234)
Browse files Browse the repository at this point in the history
Description
---
Split the peer exchange into peer exchange and metadata exchange to not
send all peer info every time.
  • Loading branch information
SWvheerden authored Jan 10, 2025
1 parent 754eec7 commit 7be5f28
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub mod grpc;
pub mod http;
pub mod p2p;

pub const PROTOCOL_VERSION: u64 = 29;
pub const PROTOCOL_VERSION: u64 = 30;
12 changes: 12 additions & 0 deletions src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ impl CatchUpSyncResponse {
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetaDataRequest {
pub peer_id: String,
pub my_info: PeerInfo,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetaDataResponse {
pub peer_id: String,
pub info: PeerInfo,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DirectPeerInfoRequest {
pub peer_id: String,
Expand Down
252 changes: 223 additions & 29 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ use tokio::{
};

use super::{
messages::{
CatchUpSyncRequest,
CatchUpSyncResponse,
DirectPeerInfoRequest,
DirectPeerInfoResponse,
NotifyNewTipBlock,
},
messages::{CatchUpSyncRequest, CatchUpSyncResponse, MetaDataRequest, MetaDataResponse, NotifyNewTipBlock},
setup,
};
use crate::{
Expand All @@ -74,7 +68,14 @@ use crate::{
http::stats_collector::StatsBroadcastClient,
p2p::{
client::ServiceClient,
messages::{self, PeerInfo, SyncMissingBlocksRequest, SyncMissingBlocksResponse},
messages::{
self,
DirectPeerInfoRequest,
DirectPeerInfoResponse,
PeerInfo,
SyncMissingBlocksRequest,
SyncMissingBlocksResponse,
},
peer_store::{AddPeerStatus, PeerStore},
relay_store::RelayStore,
},
Expand All @@ -90,6 +91,7 @@ const PEER_INFO_TOPIC: &str = "peer_info";
const BLOCK_NOTIFY_TOPIC: &str = "block_notify";
pub(crate) const SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL: &str = "/share_chain_sync/5";
pub(crate) const DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_direct_peer_info/5";
pub(crate) const META_DATA_EXCHANGE_REQ_RESP_PROTOCOL: &str = "/tari_meta_data_info/5";
pub(crate) const CATCH_UP_SYNC_REQUEST_RESPONSE_PROTOCOL: &str = "/catch_up_sync/5";
const LOG_TARGET: &str = "tari::p2pool::server::p2p";
const SYNC_REQUEST_LOG_TARGET: &str = "sync_request";
Expand All @@ -106,7 +108,8 @@ const MAX_CATCH_UP_BLOCKS_TO_RETURN: usize = 10;
// Time to start up and catch up before we start processing new tip messages
const NUM_PEERS_TO_SYNC_PER_ALGO: usize = 32;
const NUM_PEERS_INITIAL_SYNC: usize = 100;
const NUM_PEERS_TO_HEIGHT_EXCHANGE: usize = 8;
const NUM_PEERS_TO_META_DATA_EXCHANGE: usize = 8;
const NUM_PEERS_TO_PEER_INFO_EXCHANGE: usize = 8;

#[derive(Clone, Debug)]
#[allow(clippy::struct_excessive_bools)]
Expand All @@ -124,7 +127,8 @@ pub(crate) struct Config {
pub user_agent: String,
pub grey_list_clear_interval: Duration,
pub black_list_clear_interval: Duration,
pub chain_height_exchange_interval: Duration,
pub meta_data_exchange_interval: Duration,
pub peer_exchange_interval: Duration,
pub is_seed_peer: bool,
pub debug_print_chain: bool,
pub sync_job_enabled: bool,
Expand All @@ -151,7 +155,8 @@ impl Default for Config {
user_agent: "tari-p2pool".to_string(),
grey_list_clear_interval: Duration::from_secs(60 * 15),
black_list_clear_interval: Duration::from_secs(60 * 60),
chain_height_exchange_interval: Duration::from_secs(5),
meta_data_exchange_interval: Duration::from_secs(5),
peer_exchange_interval: Duration::from_secs(60 * 60),
is_seed_peer: false,
debug_print_chain: false,
sync_job_enabled: true,
Expand Down Expand Up @@ -190,6 +195,7 @@ pub struct ServerNetworkBehaviour {
pub gossipsub: gossipsub::Behaviour,
pub share_chain_sync: cbor::Behaviour<SyncMissingBlocksRequest, Result<SyncMissingBlocksResponse, String>>,
pub direct_peer_exchange: cbor::Behaviour<DirectPeerInfoRequest, Result<DirectPeerInfoResponse, String>>,
pub meta_data_exchange: cbor::Behaviour<MetaDataRequest, Result<MetaDataResponse, String>>,
pub catch_up_sync: cbor::Behaviour<CatchUpSyncRequest, Result<CatchUpSyncResponse, String>>,
pub identify: identify::Behaviour,
pub relay_server: Toggle<relay::Behaviour>,
Expand Down Expand Up @@ -794,6 +800,29 @@ where S: ShareChain
}
}

async fn initiate_meta_data_exchange(&mut self, peer: &PeerId) {
if let Ok(my_info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
.inspect_err(|error| {
error!(target: LOG_TARGET, "Failed to create peer info: {error:?}");
})
{
let local_peer_id = *self.swarm.local_peer_id();
if peer == &local_peer_id {
return;
}

self.swarm
.behaviour_mut()
.meta_data_exchange
.send_request(peer, MetaDataRequest {
my_info,
peer_id: local_peer_id.to_base58(),
});
}
}

async fn handle_direct_peer_exchange_request(
&mut self,
channel: ResponseChannel<Result<DirectPeerInfoResponse, String>>,
Expand Down Expand Up @@ -873,6 +902,133 @@ where S: ShareChain
}
}

async fn handle_meta_data_exchange_request(
&mut self,
channel: ResponseChannel<Result<MetaDataResponse, String>>,
request: MetaDataRequest,
) {
if request.my_info.version != PROTOCOL_VERSION {
// debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping",
// request.peer_id);
let _unused = self
.swarm
.behaviour_mut()
.meta_data_exchange
.send_response(channel, Err("Peer has an outdated version".to_string()))
.inspect_err(|e| {
error!(target: LOG_TARGET, "Failed to send peer info response: {e:?}");
});

return;
}

let source_peer = request.my_info.peer_id;

info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[META_DATA_REQ] New peer info: {source_peer:?}");
let local_peer_id = *self.swarm.local_peer_id();
if let Ok(info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
.await
.inspect_err(|error| {
error!(target: LOG_TARGET, "Failed to create peer info: {error:?}");
})
{
if let Err(e) = self.swarm.behaviour_mut().meta_data_exchange.send_response(
channel,
Ok(MetaDataResponse {
peer_id: local_peer_id.to_base58(),
info,
}),
) {
error!(target: LOG_TARGET, "Failed to send meta data response to {:?}: {:?}", source_peer, e);
}
} else {
error!(target: LOG_TARGET, "Failed to create peer info");
}

match request.peer_id.parse::<PeerId>() {
Ok(peer_id) => {
if self.add_peer(request.my_info, peer_id).await {
// self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
Err(error) => {
error!(target: LOG_TARGET, "Failed to parse peer id: {error:?}");
},
}
}

async fn handle_meta_data_exchange_response(&mut self, response: MetaDataResponse) {
if response.info.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", response.peer_id);
return;
}
info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[META_DATA_EXCHANGE_RESP] New peer info: {}", response.peer_id);
match response.peer_id.parse::<PeerId>() {
Ok(peer_id) => {
if response.info.squad != self.squad {
warn!(target: LOG_TARGET, "Peer {} is not in the same squad, skipping", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
return;
}
if self.add_peer(response.info.clone(), peer_id).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
// Once we have peer info from the seed peers, disconnect from them.
if self.network_peer_store.read().await.is_seed_peer(&peer_id) {
warn!(target: LOG_TARGET, "Disconnecting from seed peer {}", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
return;
}

// If they are talking an older version, disconnect
if response.info.version != PROTOCOL_VERSION {
warn!(target: LOG_TARGET, "Peer {} has an outdated version, disconnecting", peer_id);
let _ = self.swarm.disconnect_peer_id(peer_id);
return;
}
// if we are a seed peer, end here
if self.config.is_seed_peer {
return;
}

let our_tip_sha3x = self.share_chain_sha3x.chain_pow().await;

if self.config.sha3x_enabled && response.info.current_sha3x_pow > our_tip_sha3x.as_u128() {
let perform_catch_up_sync = PerformCatchUpSync {
algo: PowAlgorithm::Sha3x,
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_sha3x_height,
// their_pow: response.info.current_sha3x_pow,
permit: None,
};
let _unused = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
}
let our_tip_rx = self.share_chain_random_x.chain_pow().await;

if self.config.randomx_enabled && response.info.current_random_x_pow > our_tip_rx.as_u128() {
let perform_catch_up_sync = PerformCatchUpSync {
algo: PowAlgorithm::RandomX,
peer: peer_id,
last_block_from_them: None,
their_height: response.info.current_random_x_height,
// their_pow: response.info.current_random_x_pow,
permit: None,
};
let _unused = self
.inner_request_tx
.send(InnerRequest::PerformCatchUpSync(perform_catch_up_sync));
}
},
Err(error) => {
error!(target: LOG_TARGET, "Failed to parse peer id: {error:?}");
},
}
}

async fn handle_direct_peer_exchange_response(&mut self, response: DirectPeerInfoResponse) {
if response.info.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, "Peer {} has an outdated version, skipping", response.peer_id);
Expand Down Expand Up @@ -1285,6 +1441,36 @@ where S: ShareChain
gossipsub::Event::GossipsubNotSupported { .. } => {},
}
},
ServerNetworkBehaviourEvent::MetaDataExchange(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Message::Request {
request_id: _request_id,
request,
channel,
} => {
self.handle_meta_data_exchange_request(channel, request).await;
},
request_response::Message::Response {
request_id: _request_id,
response,
} => match response {
Ok(response) => {
self.handle_meta_data_exchange_response(response).await;
},
Err(error) => {
error!(target: LOG_TARGET, "REQ-RES peer info response error: {error:?}");
},
},
},
request_response::Event::OutboundFailure { peer, error, .. } => {
// Peers can be offline
debug!(target: LOG_TARGET, "REQ-RES meta data outbound failure: {peer:?} -> {error:?}");
},
request_response::Event::InboundFailure { peer, error, .. } => {
error!(target: LOG_TARGET, "REQ-RES meta data inbound failure: {peer:?} -> {error:?}");
},
request_response::Event::ResponseSent { .. } => {},
},
ServerNetworkBehaviourEvent::DirectPeerExchange(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Message::Request {
Expand Down Expand Up @@ -2179,8 +2365,10 @@ where S: ShareChain

let mut black_list_clear_interval = tokio::time::interval(self.config.black_list_clear_interval);
black_list_clear_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut chain_height_exchange_interval = tokio::time::interval(self.config.chain_height_exchange_interval);
chain_height_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut meta_data_exchange_interval = tokio::time::interval(self.config.meta_data_exchange_interval);
meta_data_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut peer_exchange_interval = tokio::time::interval(self.config.peer_exchange_interval);
peer_exchange_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut connection_stats_publish = tokio::time::interval(Duration::from_secs(10));
connection_stats_publish.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand All @@ -2200,7 +2388,8 @@ where S: ShareChain
tokio::pin!(shutdown_signal);
tokio::pin!(grey_list_clear_interval);
tokio::pin!(black_list_clear_interval);
tokio::pin!(chain_height_exchange_interval);
tokio::pin!(meta_data_exchange_interval);
tokio::pin!(peer_exchange_interval);
tokio::pin!(connection_stats_publish);
tokio::pin!(seek_connections_interval);

Expand Down Expand Up @@ -2334,29 +2523,34 @@ where S: ShareChain
warn!(target: LOG_TARGET, "Peer info publishing took too long: {:?}", timer.elapsed());
}
},
_ = chain_height_exchange_interval.tick() => {
_ = meta_data_exchange_interval.tick() => {
let timer = Instant::now();
if !self.config.is_seed_peer && self.config.sync_job_enabled {
let mut connected_peers = self.swarm.connected_peers().copied().collect::<Vec::<_>>();
let mut rng = thread_rng();
connected_peers.shuffle(&mut rng);
for peer in connected_peers.iter().take(NUM_PEERS_TO_HEIGHT_EXCHANGE) {
for peer in connected_peers.iter().take(NUM_PEERS_TO_META_DATA_EXCHANGE) {
// Update their latest tip.
self.initiate_direct_peer_exchange(peer).await;
self.initiate_meta_data_exchange(peer).await;

}
// self.try_sync_from_best_peer().await;
}
if timer.elapsed() > MAX_ACCEPTABLE_NETWORK_EVENT_TIMEOUT {
warn!(target: LOG_TARGET, "Chain height exchange took too long: {:?}", timer.elapsed());
}
},

_ = peer_exchange_interval.tick() => {
let timer = Instant::now();
if !self.config.is_seed_peer && self.config.sync_job_enabled {
let mut connected_peers = self.swarm.connected_peers().copied().collect::<Vec::<_>>();
let mut rng = thread_rng();
connected_peers.shuffle(&mut rng);
for peer in connected_peers.iter().take(NUM_PEERS_TO_PEER_INFO_EXCHANGE) {
// Update their latest tip.
self.initiate_direct_peer_exchange(peer).await;

// let _ = self.perform_catch_up_sync(PerformCatchUpSync {
// algo: PowAlgorithm::RandomX,
// peer,
// last_block_from_them: None,
// their_height: 0,
// });
// let _ = self.perform_catch_up_sync(PerformCatchUpSync {
// algo: PowAlgorithm::Sha3x,
// peer,
// last_block_from_them: None,
// their_height: 0,
// });
}
// self.try_sync_from_best_peer().await;
}
Expand Down
Loading

0 comments on commit 7be5f28

Please sign in to comment.