diff --git a/CHANGELOG.md b/CHANGELOG.md index 73a844b431..aae051bfea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,13 +7,16 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE ## [Unreleased] -### Added +## Added - Add `vrf_seed` to the `/v3/sortitions` rpc endpoint ### Changed - Miner will stop waiting for signatures on a block if the Stacks tip advances (causing the block it had proposed to be invalid). +- Logging improvements: + - P2P logs now includes a reason for dropping a peer or neighbor + - Improvements to how a PeerAddress is logged (human readable format vs hex) ### Fixed diff --git a/stacks-common/src/types/net.rs b/stacks-common/src/types/net.rs index 06a480bb71..2f81ba0f06 100644 --- a/stacks-common/src/types/net.rs +++ b/stacks-common/src/types/net.rs @@ -118,15 +118,12 @@ impl PeerAddress { ) } - /// Convert to SocketAddr - pub fn to_socketaddr(&self, port: u16) -> SocketAddr { + /// Convert to IpAddr + pub fn to_ipaddr(&self) -> IpAddr { if self.is_ipv4() { - SocketAddr::new( - IpAddr::V4(Ipv4Addr::new( - self.0[12], self.0[13], self.0[14], self.0[15], - )), - port, - ) + IpAddr::V4(Ipv4Addr::new( + self.0[12], self.0[13], self.0[14], self.0[15], + )) } else { let addr_words: [u16; 8] = [ ((self.0[0] as u16) << 8) | (self.0[1] as u16), @@ -138,23 +135,25 @@ impl PeerAddress { ((self.0[12] as u16) << 8) | (self.0[13] as u16), ((self.0[14] as u16) << 8) | (self.0[15] as u16), ]; - - SocketAddr::new( - IpAddr::V6(Ipv6Addr::new( - addr_words[0], - addr_words[1], - addr_words[2], - addr_words[3], - addr_words[4], - addr_words[5], - addr_words[6], - addr_words[7], - )), - port, - ) + IpAddr::V6(Ipv6Addr::new( + addr_words[0], + addr_words[1], + addr_words[2], + addr_words[3], + addr_words[4], + addr_words[5], + addr_words[6], + addr_words[7], + )) } } + /// Convert to SocketAddr + pub fn to_socketaddr(&self, port: u16) -> SocketAddr { + let ip_addr = self.to_ipaddr(); + SocketAddr::new(ip_addr, port) + } + /// Convert from socket address pub fn from_socketaddr(addr: &SocketAddr) -> PeerAddress { PeerAddress::from_ip(&addr.ip()) @@ -228,6 +227,10 @@ impl PeerAddress { pub fn to_bin(&self) -> String { to_bin(&self.0) } + + pub fn pretty_print(&self) -> String { + self.to_ipaddr().to_string() + } } /// Peer address variants for the Host: header diff --git a/stackslib/src/net/download/epoch2x.rs b/stackslib/src/net/download/epoch2x.rs index f832457259..31623be9de 100644 --- a/stackslib/src/net/download/epoch2x.rs +++ b/stackslib/src/net/download/epoch2x.rs @@ -22,6 +22,7 @@ use std::sync::mpsc::{ sync_channel, Receiver, RecvError, RecvTimeoutError, SyncSender, TryRecvError, TrySendError, }; +use p2p::DropSource; use rand::seq::SliceRandom; use rand::{thread_rng, RngCore}; use stacks_common::types::chainstate::{BlockHeaderHash, PoxId, SortitionId, StacksBlockId}; @@ -233,7 +234,7 @@ pub struct BlockDownloader { /// statistics on peers' data-plane endpoints pub(crate) dead_peers: Vec, pub(crate) broken_peers: Vec, - broken_neighbors: Vec, // disconnect peers who report invalid block inventories too + broken_neighbors: Vec, // disconnect peers who report invalid block inventories too pub(crate) blocked_urls: HashMap, // URLs that chronically don't work, and when we can try them again @@ -500,7 +501,13 @@ impl BlockDownloader { { info!("Invalid block from {:?} ({:?}): did not ask for block {}/{}", &block_key.neighbor, &block_key.data_url, block_key.consensus_hash, block.block_hash()); self.broken_peers.push(event_id); - self.broken_neighbors.push(block_key.neighbor.clone()); + self.broken_neighbors.push(DropNeighbor { + key: block_key.neighbor.clone(), + reason: DropReason::BrokenConnection( + "Remote neighbor sent an invalid block".into(), + ), + source: DropSource::BlockDownloaderGetBlocks, + }); } else { // got the block debug!( @@ -519,14 +526,25 @@ impl BlockDownloader { // the fact that we asked this peer means that it's block inv indicated // it was present, so the absence is the mark of a broken peer self.broken_peers.push(event_id); - self.broken_neighbors.push(block_key.neighbor.clone()); + self.broken_neighbors.push(DropNeighbor { + key: block_key.neighbor.clone(), + reason: DropReason::BrokenConnection( + "Remote neighbor was missing an expected block" + .into(), + ), + source: DropSource::BlockDownloaderGetBlocks, + }); } Err(e) => { info!("Error decoding response from remote neighbor {:?} (at {}): {:?}", &block_key.neighbor, &block_key.data_url, &e; "consensus_hash" => %block_key.consensus_hash ); self.broken_peers.push(event_id); - self.broken_neighbors.push(block_key.neighbor.clone()); + self.broken_neighbors.push(DropNeighbor { + key: block_key.neighbor.clone(), + reason: DropReason::BrokenConnection(format!("Error occurred decoding block response from neighbor: {e}")), + source: DropSource::BlockDownloaderGetBlocks + }); } } } @@ -632,7 +650,11 @@ impl BlockDownloader { "consensus_hash" => %block_key.consensus_hash ); self.broken_peers.push(event_id); - self.broken_neighbors.push(block_key.neighbor.clone()); + self.broken_neighbors.push(DropNeighbor { + key: block_key.neighbor.clone(), + reason: DropReason::BrokenConnection("Remote neighbor sent an unexpected zero-length microblock stream".into()), + source: DropSource::BlockDownloaderGetMicroblocks + }); } else { // have microblocks (but we don't know yet if they're well-formed) debug!( @@ -664,7 +686,11 @@ impl BlockDownloader { "consensus_hash" => %block_key.consensus_hash ); self.broken_peers.push(event_id); - self.broken_neighbors.push(block_key.neighbor.clone()); + self.broken_neighbors.push(DropNeighbor { + key: block_key.neighbor.clone(), + reason: DropReason::BrokenConnection(format!("Error occurred decoding microblock response from neighbor: {e}")), + source: DropSource::BlockDownloaderGetMicroblocks + }); } } } @@ -890,7 +916,7 @@ impl BlockDownloader { } /// Clear out broken peers that told us they had blocks, but didn't serve them. - fn clear_broken_peers(&mut self) -> (Vec, Vec) { + fn clear_broken_peers(&mut self) -> (Vec, Vec) { // remove dead/broken peers let mut disconnect = vec![]; let mut disconnect_neighbors = vec![]; @@ -2341,7 +2367,7 @@ impl PeerNetwork { Vec<(ConsensusHash, StacksBlock, u64)>, Vec<(ConsensusHash, Vec, u64)>, Vec, - Vec, + Vec, ), net_error, > { diff --git a/stackslib/src/net/download/nakamoto/download_state_machine.rs b/stackslib/src/net/download/nakamoto/download_state_machine.rs index 3f60752d1d..c117c24c33 100644 --- a/stackslib/src/net/download/nakamoto/download_state_machine.rs +++ b/stackslib/src/net/download/nakamoto/download_state_machine.rs @@ -63,7 +63,7 @@ use crate::net::inv::epoch2x::InvState; use crate::net::inv::nakamoto::{NakamotoInvStateMachine, NakamotoTenureInv}; use crate::net::neighbors::rpc::NeighborRPC; use crate::net::neighbors::NeighborComms; -use crate::net::p2p::{CurrentRewardSet, PeerNetwork}; +use crate::net::p2p::{CurrentRewardSet, DropReason, DropSource, PeerNetwork}; use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; @@ -1204,7 +1204,12 @@ impl NakamotoDownloadStateMachine { "Downloader for {} failed; this peer is dead: {:?}", &naddr, &e ); - neighbor_rpc.add_dead(network, naddr); + neighbor_rpc.add_dead( + network, + naddr, + DropReason::DeadConnection(format!("Failed to send download request: {e}")), + DropSource::NakamotoDownloadStateMachine, + ); continue; }; } @@ -1236,12 +1241,24 @@ impl NakamotoDownloadStateMachine { ) { Ok(blocks_opt) => blocks_opt, Err(NetError::StaleView) => { - neighbor_rpc.add_dead(network, &naddr); + neighbor_rpc.add_dead( + network, + &naddr, + DropReason::DeadConnection("Stale view".into()), + DropSource::NakamotoDownloadStateMachine, + ); continue; } Err(e) => { debug!("Failed to handle next download response from unconfirmed downloader for {:?} in state {:?}: {:?}", &naddr, &downloader.state, &e); - neighbor_rpc.add_dead(network, &naddr); + neighbor_rpc.add_dead( + network, + &naddr, + DropReason::DeadConnection(format!( + "Failed to handle next download response: {e}" + )), + DropSource::NakamotoDownloadStateMachine, + ); continue; } }; diff --git a/stackslib/src/net/download/nakamoto/mod.rs b/stackslib/src/net/download/nakamoto/mod.rs index eb43d8aecd..48910e0a7d 100644 --- a/stackslib/src/net/download/nakamoto/mod.rs +++ b/stackslib/src/net/download/nakamoto/mod.rs @@ -152,7 +152,7 @@ use crate::net::inv::epoch2x::InvState; use crate::net::inv::nakamoto::{NakamotoInvStateMachine, NakamotoTenureInv}; use crate::net::neighbors::rpc::NeighborRPC; use crate::net::neighbors::NeighborComms; -use crate::net::p2p::PeerNetwork; +use crate::net::p2p::{DropReason, PeerNetwork}; use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; @@ -236,11 +236,11 @@ impl PeerNetwork { }; for broken in block_downloader.neighbor_rpc.take_broken() { - self.deregister_and_ban_neighbor(&broken); + self.deregister_and_ban_neighbor(&broken.key, broken.reason, broken.source); } for dead in block_downloader.neighbor_rpc.take_dead() { - self.deregister_neighbor(&dead); + self.deregister_neighbor(&dead.key, dead.reason, dead.source); } self.block_downloader_nakamoto = Some(block_downloader); diff --git a/stackslib/src/net/download/nakamoto/tenure_downloader.rs b/stackslib/src/net/download/nakamoto/tenure_downloader.rs index 6e98703956..14e6c20eeb 100644 --- a/stackslib/src/net/download/nakamoto/tenure_downloader.rs +++ b/stackslib/src/net/download/nakamoto/tenure_downloader.rs @@ -57,7 +57,7 @@ use crate::net::inv::epoch2x::InvState; use crate::net::inv::nakamoto::{NakamotoInvStateMachine, NakamotoTenureInv}; use crate::net::neighbors::rpc::NeighborRPC; use crate::net::neighbors::NeighborComms; -use crate::net::p2p::{CurrentRewardSet, PeerNetwork}; +use crate::net::p2p::{CurrentRewardSet, DropReason, DropSource, PeerNetwork}; use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; @@ -745,7 +745,12 @@ impl NakamotoTenureDownloader { let Some(peerhost) = NeighborRPC::get_peer_host(network, &self.naddr) else { // no conversation open to this neighbor - neighbor_rpc.add_dead(network, &self.naddr); + neighbor_rpc.add_dead( + network, + &self.naddr, + DropReason::DeadConnection("No authenticated connection open".into()), + DropSource::NakamotoTenureDownloader, + ); return Err(NetError::PeerNotConnected); }; diff --git a/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs b/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs index d73342164e..5a1990961b 100644 --- a/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs +++ b/stackslib/src/net/download/nakamoto/tenure_downloader_set.rs @@ -42,7 +42,7 @@ use crate::chainstate::nakamoto::{ NakamotoBlock, NakamotoBlockHeader, NakamotoChainState, NakamotoStagingBlocksConnRef, }; use crate::chainstate::stacks::boot::RewardSet; -use crate::chainstate::stacks::db::StacksChainState; +use crate::chainstate::stacks::db::{blocks, StacksChainState}; use crate::chainstate::stacks::{ Error as chainstate_error, StacksBlockHeader, TenureChangePayload, }; @@ -62,7 +62,7 @@ use crate::net::inv::epoch2x::InvState; use crate::net::inv::nakamoto::{NakamotoInvStateMachine, NakamotoTenureInv}; use crate::net::neighbors::rpc::NeighborRPC; use crate::net::neighbors::NeighborComms; -use crate::net::p2p::{CurrentRewardSet, PeerNetwork}; +use crate::net::p2p::{CurrentRewardSet, DropReason, DropSource, PeerNetwork}; use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; @@ -582,25 +582,33 @@ impl NakamotoTenureDownloaderSet { "Send request to {naddr} for tenure {} (state {})", &downloader.tenure_id_consensus_hash, &downloader.state ); - let Ok(sent) = downloader.send_next_download_request(network, neighbor_rpc) else { - info!( - "Downloader for tenure {} to {naddr} failed; this peer is dead", - &downloader.tenure_id_consensus_hash, - ); - Self::mark_failed_and_deprioritize_peer( - &mut self.attempt_failed_tenures, - &mut self.deprioritized_peers, - &downloader.tenure_id_consensus_hash, - naddr, - ); - neighbor_rpc.add_dead(network, naddr); - continue; + match downloader.send_next_download_request(network, neighbor_rpc) { + Ok(true) => {} + Ok(false) => { + // this downloader is dead or broken + finished.push(naddr.clone()); + continue; + } + Err(e) => { + info!( + "Downloader for tenure {} to {naddr} failed; this peer is dead", + &downloader.tenure_id_consensus_hash, + ); + Self::mark_failed_and_deprioritize_peer( + &mut self.attempt_failed_tenures, + &mut self.deprioritized_peers, + &downloader.tenure_id_consensus_hash, + naddr, + ); + neighbor_rpc.add_dead( + network, + naddr, + DropReason::DeadConnection(format!("Download request failed: {e}")), + DropSource::NakamotoTenureDownloader, + ); + continue; + } }; - if !sent { - // this downloader is dead or broken - finished.push(naddr.clone()); - continue; - } } // clear dead, broken, and done @@ -630,32 +638,30 @@ impl NakamotoTenureDownloaderSet { }; debug!("Got response from {naddr}"); - let Ok(blocks_opt) = downloader - .handle_next_download_response(response) - .map_err(|e| { + let blocks = match downloader.handle_next_download_response(response) { + Ok(Some(blocks)) => blocks, + Ok(None) => continue, + Err(e) => { info!( "Failed to handle response from {naddr} on tenure {}: {e}", &downloader.tenure_id_consensus_hash, ); - e - }) - else { - debug!( - "Failed to handle download response from {naddr} on tenure {}", - &downloader.tenure_id_consensus_hash - ); - Self::mark_failed_and_deprioritize_peer( - &mut self.attempt_failed_tenures, - &mut self.deprioritized_peers, - &downloader.tenure_id_consensus_hash, - &naddr, - ); - neighbor_rpc.add_dead(network, &naddr); - continue; - }; - - let Some(blocks) = blocks_opt else { - continue; + Self::mark_failed_and_deprioritize_peer( + &mut self.attempt_failed_tenures, + &mut self.deprioritized_peers, + &downloader.tenure_id_consensus_hash, + &naddr, + ); + neighbor_rpc.add_dead( + network, + &naddr, + DropReason::DeadConnection(format!( + "Failed to handle download response: {e}" + )), + DropSource::NakamotoTenureDownloader, + ); + continue; + } }; debug!( diff --git a/stackslib/src/net/download/nakamoto/tenure_downloader_unconfirmed.rs b/stackslib/src/net/download/nakamoto/tenure_downloader_unconfirmed.rs index 2a93ba758b..2a330edb78 100644 --- a/stackslib/src/net/download/nakamoto/tenure_downloader_unconfirmed.rs +++ b/stackslib/src/net/download/nakamoto/tenure_downloader_unconfirmed.rs @@ -62,7 +62,7 @@ use crate::net::inv::epoch2x::InvState; use crate::net::inv::nakamoto::{NakamotoInvStateMachine, NakamotoTenureInv}; use crate::net::neighbors::rpc::NeighborRPC; use crate::net::neighbors::NeighborComms; -use crate::net::p2p::{CurrentRewardSet, PeerNetwork}; +use crate::net::p2p::{CurrentRewardSet, DropReason, DropSource, PeerNetwork}; use crate::net::server::HttpPeer; use crate::net::{Error as NetError, Neighbor, NeighborAddress, NeighborKey}; use crate::util_lib::db::{DBConn, Error as DBError}; @@ -838,7 +838,12 @@ impl NakamotoUnconfirmedTenureDownloader { let Some(peerhost) = NeighborRPC::get_peer_host(network, &self.naddr) else { // no conversation open to this neighbor - neighbor_rpc.add_dead(network, &self.naddr); + neighbor_rpc.add_dead( + network, + &self.naddr, + DropReason::DeadConnection("No authenticated connection open".into()), + DropSource::NakamotoUnconfirmedTenureDownloader, + ); return Err(NetError::PeerNotConnected); }; diff --git a/stackslib/src/net/inv/epoch2x.rs b/stackslib/src/net/inv/epoch2x.rs index 430189c41e..95eda2741a 100644 --- a/stackslib/src/net/inv/epoch2x.rs +++ b/stackslib/src/net/inv/epoch2x.rs @@ -19,6 +19,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::io::{Read, Write}; use std::net::SocketAddr; +use p2p::DropSource; use rand; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; @@ -39,7 +40,7 @@ use crate::net::codec::*; use crate::net::connection::{ConnectionOptions, ConnectionP2P, ReplyHandleP2P}; use crate::net::db::{PeerDB, *}; use crate::net::neighbors::MAX_NEIGHBOR_BLOCK_DELAY; -use crate::net::p2p::{PeerNetwork, PeerNetworkWorkState}; +use crate::net::p2p::{DropReason, PeerNetwork, PeerNetworkWorkState}; use crate::net::{ Error as net_error, GetBlocksInv, Neighbor, NeighborKey, PeerAddress, StacksMessage, StacksP2P, *, @@ -2302,7 +2303,7 @@ impl PeerNetwork { ); if !stats.done { match network.inv_sync_run(&mut new_pins, sortdb, nk, stats, inv_state.request_timeout, ibd) { - Ok(d) => d, + Ok(_) => {} Err(net_error::StaleView) => { // stop work on this state machine -- it needs to be restarted. // we'll need to keep scanning. @@ -2310,19 +2311,16 @@ impl PeerNetwork { stats.done = true; inv_state.hint_learned_data = true; inv_state.hint_learned_data_height = u64::MAX; - true } Err(net_error::PeerNotConnected) | Err(net_error::SendError(..)) => { stats.status = NodeStatus::Dead; - true } Err(e) => { debug!( - "{:?}: remote neighbor inv_sync_run finished with error {:?}", - &network.local_peer, &e + "{:?}: remote neighbor inv_sync_run finished with error {e:?}", + &network.local_peer ); stats.status = NodeStatus::Broken; - true } }; @@ -2655,12 +2653,22 @@ impl PeerNetwork { // disconnect and ban broken peers for broken in broken_neighbors.into_iter() { - self.deregister_and_ban_neighbor(&broken); + //substantial changes to the epoch2x sync would be required to get further detail about why the connection was broken. Just use "Unknown" for now. + self.deregister_and_ban_neighbor( + &broken, + DropReason::BrokenConnection("Unknown".into()), + DropSource::Epoch2xInventorySync, + ); } // disconnect from dead connections for dead in dead_neighbors.into_iter() { - self.deregister_neighbor(&dead); + //substantial changes to the epoch2x sync would be required to get further detail about why the connection is dead. Just use "Unknown" for now. + self.deregister_neighbor( + &dead, + DropReason::DeadConnection("Unknown".into()), + DropSource::Epoch2xInventorySync, + ); } (done, throttled) diff --git a/stackslib/src/net/inv/nakamoto.rs b/stackslib/src/net/inv/nakamoto.rs index 9bebbaf642..37abcfbcf8 100644 --- a/stackslib/src/net/inv/nakamoto.rs +++ b/stackslib/src/net/inv/nakamoto.rs @@ -28,10 +28,10 @@ use crate::chainstate::nakamoto::NakamotoChainState; use crate::chainstate::stacks::db::StacksChainState; use crate::net::db::PeerDB; use crate::net::neighbors::comms::PeerNetworkComms; -use crate::net::p2p::PeerNetwork; +use crate::net::p2p::{DropSource, PeerNetwork}; use crate::net::{ - Error as NetError, GetNakamotoInvData, NackErrorCodes, NakamotoInvData, NeighborAddress, - NeighborComms, NeighborKey, StacksMessage, StacksMessageType, + DropNeighbor, DropReason, Error as NetError, GetNakamotoInvData, NackErrorCodes, + NakamotoInvData, NeighborAddress, NeighborComms, NeighborKey, StacksMessage, StacksMessageType, }; use crate::util_lib::db::Error as DBError; @@ -995,7 +995,14 @@ impl NakamotoInvStateMachine { "{:?}: Failed to finish inventory sync to {naddr}: {e:?}", network.get_local_peer() ); - self.comms.add_broken(network, &naddr); + self.comms.add_broken( + network, + &naddr, + DropReason::BrokenConnection(format!( + "Failed to finish inventory sync: {e}" + )), + DropSource::NakamotoInvStateMachine, + ); }) else { continue; @@ -1083,7 +1090,7 @@ impl PeerNetwork { &mut self, sortdb: &SortitionDB, ibd: bool, - ) -> (bool, Vec, Vec) { + ) -> (bool, Vec, Vec) { if self.inv_state_nakamoto.is_none() { self.init_inv_sync_nakamoto(); } @@ -1127,12 +1134,12 @@ impl PeerNetwork { // disconnect and ban broken peers for broken in broken_neighbors.into_iter() { - self.deregister_and_ban_neighbor(&broken); + self.deregister_and_ban_neighbor(&broken.key, broken.reason, broken.source); } // disconnect from dead connections for dead in dead_neighbors.into_iter() { - self.deregister_neighbor(&dead); + self.deregister_neighbor(&dead.key, dead.reason, dead.source); } learned diff --git a/stackslib/src/net/mod.rs b/stackslib/src/net/mod.rs index 946a74d84d..9426fb748b 100644 --- a/stackslib/src/net/mod.rs +++ b/stackslib/src/net/mod.rs @@ -35,6 +35,7 @@ use clarity::vm::{ClarityName, ContractName, Value}; use libstackerdb::{ Error as libstackerdb_error, SlotMetadata, StackerDBChunkAckData, StackerDBChunkData, }; +use p2p::{DropPeer, DropReason, DropSource}; use rand::{thread_rng, RngCore}; use regex::Regex; use rusqlite::types::{ToSql, ToSqlOutput}; @@ -1310,6 +1311,30 @@ pub const BLOCKS_PUSHED_MAX: u32 = 32; // message. pub const NAKAMOTO_BLOCKS_PUSHED_MAX: u32 = 32; +/// Neighbor to drop +#[derive(Clone, Eq, PartialOrd, Ord, Debug)] +pub struct DropNeighbor { + /// the neighbor identifier + pub key: NeighborKey, + /// the reason for dropping the neighbor + pub reason: DropReason, + /// the reason the neighbor should be dropped + pub source: DropSource, +} + +impl Hash for DropNeighbor { + fn hash(&self, state: &mut H) { + // ignores reason and source, we only care about the neighbor key + self.key.hash(state); + } +} + +impl PartialEq for DropNeighbor { + fn eq(&self, other: &DropNeighbor) -> bool { + self.key == other.key + } +} + /// neighbor identifier #[derive(Clone, Eq, PartialOrd, Ord)] pub struct NeighborKey { diff --git a/stackslib/src/net/neighbors/comms.rs b/stackslib/src/net/neighbors/comms.rs index 48759c913d..4c87d9c606 100644 --- a/stackslib/src/net/neighbors/comms.rs +++ b/stackslib/src/net/neighbors/comms.rs @@ -32,8 +32,8 @@ use crate::net::neighbors::{ }; use crate::net::p2p::PeerNetwork; use crate::net::{ - Error as net_error, HandshakeData, Neighbor, NeighborAddress, NeighborKey, PeerAddress, - StacksMessage, StacksMessageType, NUM_NEIGHBORS, + DropNeighbor, DropReason, DropSource, Error as net_error, HandshakeData, Neighbor, + NeighborAddress, NeighborKey, PeerAddress, StacksMessage, StacksMessageType, NUM_NEIGHBORS, }; /// A trait for representing session state for a set of connected neighbors, for the purposes of executing some P2P @@ -53,9 +53,21 @@ pub trait NeighborComms { /// Remove a neighbor from connecting state due to an error fn remove_connecting_error(&mut self, network: &PeerNetwork, nk: &NK); /// Mark a neighbor as dead (inactive, unreachable, etc.) - fn add_dead(&mut self, network: &PeerNetwork, nk: &NK); + fn add_dead( + &mut self, + network: &PeerNetwork, + nk: &NK, + reason: DropReason, + source: DropSource, + ); /// Mark a neighbor as broken (in protocol violation) - fn add_broken(&mut self, network: &PeerNetwork, nk: &NK); + fn add_broken( + &mut self, + network: &PeerNetwork, + nk: &NK, + reason: DropReason, + source: DropSource, + ); /// Pin a connection -- prevent it from getting pruned fn pin_connection(&mut self, event_id: usize); /// Unpin a connection -- allow it to get pruned @@ -79,9 +91,9 @@ pub trait NeighborComms { network: &mut PeerNetwork, ) -> Vec<(NeighborAddress, StacksMessage)>; /// Take all dead neighbors - fn take_dead_neighbors(&mut self) -> HashSet; + fn take_dead_neighbors(&mut self) -> HashSet; /// Take all broken neighbors - fn take_broken_neighbors(&mut self) -> HashSet; + fn take_broken_neighbors(&mut self) -> HashSet; /// Cancel any ongoing requests. Any messages that had been enqueued from /// `add_batch_request()` will not be delivered after this call completes. fn cancel_inflight(&mut self); @@ -106,12 +118,17 @@ pub trait NeighborComms { let msg = network .sign_for_neighbor(&nk, StacksMessageType::Handshake(handshake_data)) - .inspect_err(|_e| { + .inspect_err(|e| { info!( "{:?}: Failed to sign for peer {nk:?}", network.get_local_peer(), ); - self.add_dead(network, &nk); + self.add_dead( + network, + &nk, + DropReason::DeadConnection(format!("Failed to sign message: {e}")), + DropSource::NeighborCommsHandshake, + ); })?; network @@ -123,7 +140,12 @@ pub trait NeighborComms { &nk, &e ); - self.add_dead(network, &nk); + self.add_dead( + network, + &nk, + DropReason::DeadConnection(format!("Not connected: {e}")), + DropSource::NeighborCommsHandshake, + ); net_error::PeerNotConnected }) } @@ -381,7 +403,12 @@ pub trait NeighborComms { Ok(None) } Err(Err(e)) => { - self.add_dead(network, req_nk); + self.add_dead( + network, + req_nk, + DropReason::DeadConnection(format!("Failed to receive message: {e}")), + DropSource::NeighborCommsPoll, + ); Err(e) } }, @@ -425,9 +452,9 @@ pub struct PeerNetworkComms { /// Map of neighbors we're currently trying to connect to (binds their addresses to their event IDs) connecting: HashMap, /// Set of neighbors that died during our comms session - dead_connections: HashSet, + dead_connections: HashSet, /// Set of neighbors who misbehaved during our comms session - broken_connections: HashSet, + broken_connections: HashSet, /// Ongoing batch of p2p requests. Will be `None` if there are no inflight requests. ongoing_batch_request: Option, } @@ -523,12 +550,32 @@ impl NeighborComms for PeerNetworkComms { } } - fn add_dead(&mut self, network: &PeerNetwork, nk: &NK) { - self.dead_connections.insert(nk.to_neighbor_key(network)); + fn add_dead( + &mut self, + network: &PeerNetwork, + nk: &NK, + reason: DropReason, + source: DropSource, + ) { + self.dead_connections.insert(DropNeighbor { + key: nk.to_neighbor_key(network), + reason, + source, + }); } - fn add_broken(&mut self, network: &PeerNetwork, nk: &NK) { - self.broken_connections.insert(nk.to_neighbor_key(network)); + fn add_broken( + &mut self, + network: &PeerNetwork, + nk: &NK, + reason: DropReason, + source: DropSource, + ) { + self.broken_connections.insert(DropNeighbor { + key: nk.to_neighbor_key(network), + reason, + source, + }); } fn pin_connection(&mut self, event_id: usize) { @@ -602,14 +649,12 @@ impl NeighborComms for PeerNetworkComms { self.ongoing_batch_request = None; } - fn take_dead_neighbors(&mut self) -> HashSet { - let dead = mem::replace(&mut self.dead_connections, HashSet::new()); - dead + fn take_dead_neighbors(&mut self) -> HashSet { + mem::replace(&mut self.dead_connections, HashSet::new()) } - fn take_broken_neighbors(&mut self) -> HashSet { - let broken = mem::replace(&mut self.broken_connections, HashSet::new()); - broken + fn take_broken_neighbors(&mut self) -> HashSet { + mem::replace(&mut self.broken_connections, HashSet::new()) } } diff --git a/stackslib/src/net/neighbors/db.rs b/stackslib/src/net/neighbors/db.rs index 3b1d99e906..db09ab434f 100644 --- a/stackslib/src/net/neighbors/db.rs +++ b/stackslib/src/net/neighbors/db.rs @@ -25,10 +25,10 @@ use stacks_common::util::{get_epoch_time_secs, log}; use crate::burnchains::{Address, Burnchain, BurnchainView}; use crate::net::db::PeerDB; use crate::net::neighbors::{NeighborWalkResult, NEIGHBOR_MINIMUM_CONTACT_INTERVAL, NUM_NEIGHBORS}; -use crate::net::p2p::PeerNetwork; +use crate::net::p2p::{DropReason, DropSource, PeerNetwork}; use crate::net::{ - Error as net_error, HandshakeAcceptData, HandshakeData, Neighbor, NeighborAddress, NeighborKey, - Preamble, StackerDBHandshakeData, StacksMessage, + DropNeighbor, Error as net_error, HandshakeAcceptData, HandshakeData, Neighbor, + NeighborAddress, NeighborKey, Preamble, StackerDBHandshakeData, StacksMessage, }; use crate::util_lib::db::{DBConn, DBTx}; @@ -493,7 +493,11 @@ impl NeighborWalkDB for PeerDBNeighborWalk { ); PeerDB::insert_or_replace_peer(&tx, replacement, *slot)?; - result.add_replaced(replaced.addr.clone()); + result.add_replaced(DropNeighbor { + key: replaced.addr.clone(), + reason: DropReason::ReplacedConnection, + source: DropSource::NeighborWalkPeerDB, + }); } } tx.commit()?; diff --git a/stackslib/src/net/neighbors/rpc.rs b/stackslib/src/net/neighbors/rpc.rs index 51ece56bb2..5331af4a16 100644 --- a/stackslib/src/net/neighbors/rpc.rs +++ b/stackslib/src/net/neighbors/rpc.rs @@ -35,9 +35,9 @@ use crate::net::neighbors::{ use crate::net::p2p::PeerNetwork; use crate::net::server::HttpPeer; use crate::net::{ - Error as NetError, HandshakeData, Neighbor, NeighborAddress, NeighborKey, PeerAddress, - PeerHostExtensions, StacksHttpRequest, StacksHttpResponse, StacksMessage, StacksMessageType, - NUM_NEIGHBORS, + DropNeighbor, DropReason, DropSource, Error as NetError, HandshakeData, Neighbor, + NeighborAddress, NeighborKey, PeerAddress, PeerHostExtensions, StacksHttpRequest, + StacksHttpResponse, StacksMessage, StacksMessageType, NUM_NEIGHBORS, }; /// This struct represents a batch of in-flight RPCs to a set of peers, identified by a @@ -45,8 +45,8 @@ use crate::net::{ #[derive(Debug)] pub struct NeighborRPC { state: HashMap)>, - dead: HashSet, - broken: HashSet, + dead: HashSet, + broken: HashSet, } impl NeighborRPC { @@ -59,38 +59,73 @@ impl NeighborRPC { } /// Add a dead neighbor -- a neighbor which failed to communicate with us. - pub fn add_dead(&mut self, network: &PeerNetwork, naddr: &NeighborAddress) { - self.dead.insert(naddr.to_neighbor_key(network)); + pub fn add_dead( + &mut self, + network: &PeerNetwork, + naddr: &NeighborAddress, + reason: DropReason, + source: DropSource, + ) { + self.dead.insert(DropNeighbor { + key: naddr.to_neighbor_key(network), + reason, + source, + }); } /// Add a broken neighbor -- a neighbor which violated protocol. - pub fn add_broken(&mut self, network: &PeerNetwork, naddr: &NeighborAddress) { - self.broken.insert(naddr.to_neighbor_key(network)); + pub fn add_broken( + &mut self, + network: &PeerNetwork, + naddr: &NeighborAddress, + reason: DropReason, + source: DropSource, + ) { + self.broken.insert(DropNeighbor { + key: naddr.to_neighbor_key(network), + reason, + source, + }); } /// Is a neighbor dead? pub fn is_dead(&self, network: &PeerNetwork, naddr: &NeighborAddress) -> bool { - self.dead.contains(&naddr.to_neighbor_key(network)) + // reason and source does't matter. They are ignored by the hasher/partial eq + self.dead.contains(&DropNeighbor { + key: naddr.to_neighbor_key(network), + reason: DropReason::Unknown, + source: DropSource::Unknown, + }) } /// Is a neighbor broken pub fn is_broken(&self, network: &PeerNetwork, naddr: &NeighborAddress) -> bool { - self.broken.contains(&naddr.to_neighbor_key(network)) + // reason and source does't matter. They are ignored by the hasher/partial eq + self.broken.contains(&DropNeighbor { + key: naddr.to_neighbor_key(network), + reason: DropReason::Unknown, + source: DropSource::Unknown, + }) } /// Is a neighbor dead or broken? pub fn is_dead_or_broken(&self, network: &PeerNetwork, naddr: &NeighborAddress) -> bool { - let nk = naddr.to_neighbor_key(network); - self.dead.contains(&nk) || self.broken.contains(&nk) + // reason and source does't matter. They are ignored by the hasher/partial eq + let dn = DropNeighbor { + key: naddr.to_neighbor_key(network), + reason: DropReason::Unknown, + source: DropSource::Unknown, + }; + self.dead.contains(&dn) || self.broken.contains(&dn) } /// Extract the list of dead neighbors - pub fn take_dead(&mut self) -> HashSet { + pub fn take_dead(&mut self) -> HashSet { std::mem::replace(&mut self.dead, HashSet::new()) } /// Extract the list of broken neighbors - pub fn take_broken(&mut self) -> HashSet { + pub fn take_broken(&mut self) -> HashSet { std::mem::replace(&mut self.broken, HashSet::new()) } @@ -122,18 +157,21 @@ impl NeighborRPC { inflight.insert(naddr, (event_id, request_opt)); continue; } - Err(_e) => { + Err(e) => { // declare this neighbor as dead by default - debug!("Failed to poll next reply from {}: {:?}", &naddr, &_e); - dead.push(naddr); + debug!("Failed to poll next reply from {}: {:?}", &naddr, &e); + dead.push(( + naddr, + DropReason::DeadConnection(format!("Failed to poll next reply: {e}")), + )); continue; } }; ret.push((naddr, response)); } - for naddr in dead.into_iter() { - self.add_dead(network, &naddr); + for (naddr, reason) in dead.into_iter() { + self.add_dead(network, &naddr, reason, DropSource::NeighborRPC); } self.state.extend(inflight); ret diff --git a/stackslib/src/net/neighbors/walk.rs b/stackslib/src/net/neighbors/walk.rs index da48ad4ebd..b133e73a11 100644 --- a/stackslib/src/net/neighbors/walk.rs +++ b/stackslib/src/net/neighbors/walk.rs @@ -31,11 +31,11 @@ use crate::net::neighbors::{ NeighborComms, NeighborReplacements, NeighborWalkDB, ToNeighborKey, MAX_NEIGHBOR_BLOCK_DELAY, NEIGHBOR_MINIMUM_CONTACT_INTERVAL, }; -use crate::net::p2p::PeerNetwork; +use crate::net::p2p::{DropReason, DropSource, PeerNetwork}; use crate::net::{ - Error as net_error, HandshakeAcceptData, HandshakeData, MessageSequence, Neighbor, - NeighborAddress, NeighborKey, PeerAddress, Preamble, StackerDBHandshakeData, StacksMessage, - StacksMessageType, NUM_NEIGHBORS, + DropNeighbor, Error as net_error, HandshakeAcceptData, HandshakeData, MessageSequence, + Neighbor, NeighborAddress, NeighborKey, PeerAddress, Preamble, StackerDBHandshakeData, + StacksMessage, StacksMessageType, NUM_NEIGHBORS, }; /// This struct records information from an inbound peer that has authenticated to this node. As @@ -60,12 +60,12 @@ pub struct NeighborWalkResult { /// Newly-added node neighbors pub new_connections: HashSet, /// Dead connections discovered (so we can close their sockets) - pub dead_connections: HashSet, + pub dead_connections: HashSet, /// Connections to misbehaving peers (so we can close their sockets and ban them) - pub broken_connections: HashSet, + pub broken_connections: HashSet, /// Neighbors who got replaced in the PeerDB because they were offline, but mapped to a new /// peer that was online and had the same slot locations - pub replaced_neighbors: HashSet, + pub replaced_neighbors: HashSet, } impl NeighborWalkResult { @@ -82,16 +82,16 @@ impl NeighborWalkResult { self.new_connections.insert(nk); } - pub fn add_broken(&mut self, nk: NeighborKey) { - self.broken_connections.insert(nk); + pub fn add_broken(&mut self, dn: DropNeighbor) { + self.broken_connections.insert(dn); } - pub fn add_dead(&mut self, nk: NeighborKey) { - self.dead_connections.insert(nk); + pub fn add_dead(&mut self, dn: DropNeighbor) { + self.dead_connections.insert(dn); } - pub fn add_replaced(&mut self, nk: NeighborKey) { - self.replaced_neighbors.insert(nk); + pub fn add_replaced(&mut self, dn: DropNeighbor) { + self.replaced_neighbors.insert(dn); } pub fn clear(&mut self) { @@ -777,8 +777,12 @@ impl NeighborWalk { network.get_local_peer(), &self.cur_neighbor.addr ); - self.comms - .add_broken(network, &self.cur_neighbor.addr.clone()); + self.comms.add_broken( + network, + &self.cur_neighbor.addr.clone(), + DropReason::BrokenConnection("Out of sequence message".into()), + DropSource::NeighborWalkHandshake, + ); return Err(net_error::InvalidMessage); } }; @@ -884,8 +888,15 @@ impl NeighborWalk { &self.cur_neighbor.addr, data.error_code ); - self.comms - .add_broken(network, &self.cur_neighbor.addr.clone()); + self.comms.add_broken( + network, + &self.cur_neighbor.addr.clone(), + DropReason::BrokenConnection(format!( + "NACK'ed with error code: {}", + data.error_code + )), + DropSource::NeighborWalkGetNeighbors, + ); return Err(net_error::ConnectionBroken); } _ => { @@ -895,8 +906,12 @@ impl NeighborWalk { network.get_local_peer(), &self.cur_neighbor.addr ); - self.comms - .add_broken(network, &self.cur_neighbor.addr.clone()); + self.comms.add_broken( + network, + &self.cur_neighbor.addr.clone(), + DropReason::BrokenConnection("Out-of-sequence message".into()), + DropSource::NeighborWalkGetNeighbors, + ); return Err(net_error::InvalidMessage); } }; @@ -1140,6 +1155,8 @@ impl NeighborWalk { message.preamble.network_id, &naddr, ), + DropReason::DeadConnection("Handshake rejected".into()), + DropSource::NeighborWalkHandshake, ); continue; } @@ -1158,6 +1175,11 @@ impl NeighborWalk { message.preamble.network_id, &naddr, ), + DropReason::DeadConnection(format!( + "NACK'ed with error code: {}", + data.error_code + )), + DropSource::NeighborWalkHandshake, ); continue; } @@ -1175,6 +1197,8 @@ impl NeighborWalk { message.preamble.network_id, &naddr, ), + DropReason::BrokenConnection("Out-of-sequence message".into()), + DropSource::NeighborWalkHandshake, ); continue; } @@ -1326,7 +1350,12 @@ impl NeighborWalk { _ => { // unexpected reply debug!("{:?}: Neighbor {:?} replied an out-of-sequence message (type {}); assuming broken", network.get_local_peer(), &nkey, message.get_message_name()); - self.comms.add_broken(network, &nkey); + self.comms.add_broken( + network, + &nkey, + DropReason::BrokenConnection("Out-of-sequence message".into()), + DropSource::NeighborWalkGetNeighbors, + ); } } } @@ -1807,21 +1836,30 @@ impl NeighborWalk { nkey, data.error_code ); - self.comms.add_broken(network, &nkey); + self.comms.add_broken( + network, + &nkey, + DropReason::DeadConnection("NACK'ed Handshake".into()), + DropSource::NeighborWalkPing, + ); continue; } _ => { // unexpected reply -- this peer is misbehaving and should be replaced - debug!("{:?}: Neighbor {:?} replied an out-of-sequence message (type {}); will replace", network.get_local_peer(), &nkey, message.get_message_name()); - self.comms.add_broken(network, &nkey); + debug!("{:?}: Neighbor {nkey:?} replied an out-of-sequence message (type {}); will replace", network.get_local_peer(), message.get_message_name()); + self.comms.add_broken( + network, + &nkey, + DropReason::BrokenConnection("Out-of-sequence message".into()), + DropSource::NeighborWalkPing, + ); continue; } }; debug!( - "{:?}: Got HandshakeAccept on pingback from {:?}", - network.get_local_peer(), - &nkey; + "{:?}: Got HandshakeAccept on pingback from {nkey:?}", + network.get_local_peer(); "handshake_data" => ?data, "stackerdb_data" => ?db_data ); diff --git a/stackslib/src/net/p2p.rs b/stackslib/src/net/p2p.rs index 6e2e8ce461..553a041ec1 100644 --- a/stackslib/src/net/p2p.rs +++ b/stackslib/src/net/p2p.rs @@ -274,6 +274,196 @@ impl StacksTipInfo { } } +/// The status of a peer +#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] +pub enum PeerStatus { + /// The peer is connecting + Connecting, + /// The peer is unauthenticated + Unauthenticated, + /// The peer is authenticated + Authenticated, +} + +impl std::fmt::Display for PeerStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PeerStatus::Connecting => write!(f, "Connecting"), + PeerStatus::Unauthenticated => write!(f, "Unauthenticated"), + PeerStatus::Authenticated => write!(f, "Authenticated"), + } + } +} + +/// The reason why a peer should be dropped +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord)] +pub enum DropReason { + /// Unknown + Unknown, + /// The peer has been unresponsive for too long + Unresponsive { + /// The configured timeout in seconds + timeout: u64, + /// The last time the peer was responsive + last_seen: u64, + /// The status of the peer at the time of deregistration + status: PeerStatus, + }, + /// The peer connection is dead + DeadConnection(String), + /// The peer connection is banned + BannedConnection, + /// The peer connection is broken + BrokenConnection(String), + /// The connection was replaced + ReplacedConnection, + /// Too many inbound connections from the same IP + TooManyConnections, + /// Peer's Org has too many members + OrgTooManyMembers, + /// Peer's Org dominates our peer table + OrgDominatesPeerTable, + /// There was a request to drop the peer due to a testing directive + FaultInjection, +} + +impl std::fmt::Display for DropReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DropReason::Unknown => write!(f, "Unknown"), + DropReason::Unresponsive { + timeout, + last_seen, + status, + } => { + write!(f, "{status} Peer was unresponsive for longer than {timeout} seconds: last seen at {last_seen}") + } + DropReason::DeadConnection(msg) => write!(f, "The peer connection is dead: {msg}"), + DropReason::BannedConnection => write!(f, "The peer connection is banned"), + DropReason::BrokenConnection(msg) => write!(f, "The peer connection is broken: {msg}"), + DropReason::ReplacedConnection => write!(f, "The peer connection was replaced"), + DropReason::TooManyConnections => { + write!(f, "Too many inbound connections from the same IP") + } + DropReason::OrgTooManyMembers => write!(f, " The peer's org has too many members"), + DropReason::OrgDominatesPeerTable => { + write!(f, "The peer's org dominates our peer table") + } + DropReason::FaultInjection => { + write!(f, "The peer was dropped due to a testing directive") + } + } + } +} + +/// A peer subystem +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)] +pub enum DropSource { + /// Unknown source + Unknown, + /// From the peer network + PeerNetwork, + /// From processing a peer network inbound ready socket + PeerNetworkInboundReadySocket, + /// From processing a peer network outbound ready socket + PeerNetworkOutboundReadySocket, + /// From a peer network unresponsive update + PeerNetworkDisconnectUnresponsive, + /// From a peer network block download attempt + PeerNetworkBlockDownload, + /// From a poll from the neighbor comms + NeighborCommsPoll, + /// From a handshake from the neighbor comms + NeighborCommsHandshake, + /// From a handshake from the neighbor walk + NeighborWalkHandshake, + /// From a GetNeighbors request from the neighbor walk + NeighborWalkGetNeighbors, + /// From a Ping request from the neighbor walk + NeighborWalkPing, + /// From the peer DB Neighbor walk + NeighborWalkPeerDB, + /// From an Epoch 2x Inventory Sync + Epoch2xInventorySync, + /// From a Nakamoto inventory sync + NakamotoInventorySync, + /// From a attempt to get the Nakamoto inventory sync + NakamotoGetInventorySync, + /// From the Nakamoto download state machine + NakamotoDownloadStateMachine, + /// From the Nakamoto tenure downloader set + NakamotoTenureDownloader, + /// From the Nakamoto unconfirmed tenure downloader + NakamotoUnconfirmedTenureDownloader, + /// From the Nakamoto inventory state machine + NakamotoInvStateMachine, + /// From the neighbor RPC + NeighborRPC, + /// From a network block download + NetworkBlockDownload, + /// From a getblocks attempt in the block downloader + BlockDownloaderGetBlocks, + /// From a getmicroblocks attempt in the block downloader + BlockDownloaderGetMicroblocks, +} + +impl std::fmt::Display for DropSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DropSource::Unknown => write!(f, "Unknown"), + DropSource::PeerNetwork => write!(f, "PeerNetwork"), + DropSource::PeerNetworkInboundReadySocket => write!(f, "PeerNetworkInboundReadySocket"), + DropSource::PeerNetworkOutboundReadySocket => { + write!(f, "PeerNetworkOutboundReadySocket") + } + DropSource::PeerNetworkDisconnectUnresponsive => { + write!(f, "PeerNetworkDisconnectUnresponsive") + } + DropSource::PeerNetworkBlockDownload => write!(f, "PeerNetworkBlockDownload"), + DropSource::NeighborCommsPoll => write!(f, "NeighborCommsPoll"), + DropSource::NeighborCommsHandshake => write!(f, "NeighborCommsHandshake"), + DropSource::NeighborWalkHandshake => write!(f, "NeighborWalkHandshake"), + DropSource::NeighborWalkGetNeighbors => write!(f, "NeighborWalkGetNeighbors"), + DropSource::NeighborWalkPing => write!(f, "NeighborWalkPing"), + DropSource::NeighborWalkPeerDB => write!(f, "NeighborWalkPeerDB"), + DropSource::Epoch2xInventorySync => write!(f, "Epoch2xInventorySync"), + DropSource::NakamotoInventorySync => write!(f, "NakamotoInventorySync"), + DropSource::NakamotoGetInventorySync => write!(f, "NakamotoGetInventorySync"), + DropSource::NakamotoDownloadStateMachine => write!(f, "NakamotoDownloadStateMachine"), + DropSource::NakamotoTenureDownloader => write!(f, "NakamotoTenureDownloader"), + DropSource::NakamotoUnconfirmedTenureDownloader => { + write!(f, "NakamotoUnconfirmedTenureDownloader") + } + DropSource::NakamotoInvStateMachine => write!(f, "NakamotoInvStateMachine"), + DropSource::NeighborRPC => write!(f, "NeighborRPC"), + DropSource::NetworkBlockDownload => write!(f, "NetworkBlockDownload"), + DropSource::BlockDownloaderGetBlocks => write!(f, "BlockDownloaderGetBlocks"), + DropSource::BlockDownloaderGetMicroblocks => write!(f, "BlockDownloaderGetMicroblocks"), + } + } +} + +/// A helper struct for holding peer drop information +#[derive(Debug, Clone)] +pub struct DropPeer { + /// The reason for dropping the peer + pub reason: DropReason, + /// The address of the peer to drop + pub address: PeerAddress, + /// The subsystem source that is dropping the peer + pub source: DropSource, +} + +impl From<&DropNeighbor> for DropPeer { + fn from(drop_neighbor: &DropNeighbor) -> Self { + DropPeer { + reason: drop_neighbor.reason.clone(), + address: drop_neighbor.key.addrbytes, + source: drop_neighbor.source.clone(), + } + } +} + pub struct PeerNetwork { // constants pub peer_version: u32, @@ -1514,7 +1704,7 @@ impl PeerNetwork { } /// Process ban requests. Update the deny in the peer database. Return the vec of event IDs to disconnect from. - fn process_bans(&mut self) -> Result, net_error> { + fn process_bans(&mut self) -> Result, net_error> { if cfg!(test) && self.connection_opts.disable_network_bans { return Ok(vec![]); } @@ -1547,7 +1737,11 @@ impl PeerNetwork { } }; - disconnect.push(event_id); + disconnect.push(DropPeer { + address: neighbor_key.addrbytes, + reason: DropReason::BannedConnection, + source: DropSource::PeerNetwork, + }); let now = get_epoch_time_secs(); let penalty = if let Some(neighbor_info) = neighbor_info_opt { @@ -1814,7 +2008,7 @@ impl PeerNetwork { Ok(_) => { info!("Neighbor accepted!"; "public key" => ?pubkey_opt, - "address" => %neighbor_key.addrbytes); + "address" => %neighbor_key.addrbytes.pretty_print()); } Err(e) => { debug!( @@ -1890,14 +2084,19 @@ impl PeerNetwork { } /// Deregister a socket/event pair - pub fn deregister_peer(&mut self, event_id: usize) { - debug!("{:?}: Disconnect event {}", &self.local_peer, event_id); + pub fn deregister_peer(&mut self, peer: DropPeer) { + let reason = peer.reason; + debug!( + "{:?}: Disconnect peer {}", + &self.local_peer, + peer.address.pretty_print() + ); - let mut nk_remove: Vec<(NeighborKey, Hash160)> = vec![]; - for (neighbor_key, ev_id) in self.events.iter() { - if *ev_id == event_id { + let mut nk_remove = vec![]; + for (neighbor_key, event_id) in self.events.iter() { + if neighbor_key.addrbytes == peer.address { let pubkh = self - .get_p2p_convo(event_id) + .get_p2p_convo(*event_id) .and_then(|convo| convo.get_public_key_hash()) .unwrap_or(Hash160([0x00; 20])); nk_remove.push((neighbor_key.clone(), pubkh)); @@ -1906,71 +2105,90 @@ impl PeerNetwork { for (nk, pubkh) in nk_remove.into_iter() { // remove event state - self.events.remove(&nk); - info!("Dropping neighbor!"; - "event id" => %event_id, - "public address" => %pubkh, - "public key" => %nk.addrbytes - ); - + if let Some(event_id) = self.events.remove(&nk) { + info!("Dropping neighbor!"; + "event id" => event_id, + "public key" => %pubkh, + "public addr" => nk.addrbytes.pretty_print(), + "reason" => %reason + ); + self.pending_messages.remove(&(event_id, nk.clone())); + self.pending_stacks_messages.remove(&(event_id, nk.clone())); + + match self.network { + None => {} + Some(ref mut network) => { + // deregister socket if connected and registered already + if let Some(socket) = self.sockets.remove(&event_id) { + let _ = network.deregister(event_id, &socket); + } + // deregister socket if still connecting + if let Some(ConnectingPeer { socket, .. }) = + self.connecting.remove(&event_id) + { + let _ = network.deregister(event_id, &socket); + } + } + } + self.relay_handles.remove(&event_id); + self.peers.remove(&event_id); + } // remove inventory state if let Some(inv_state) = self.inv_state.as_mut() { debug!( - "{:?}: Remove inventory state for epoch 2.x {:?}", - &self.local_peer, &nk + "{:?}: Remove inventory state for epoch 2.x {nk:?}", + &self.local_peer ); inv_state.del_peer(&nk); } if let Some(inv_state) = self.inv_state_nakamoto.as_mut() { debug!( - "{:?}: Remove inventory state for Nakamoto {:?}", - &self.local_peer, &nk + "{:?}: Remove inventory state for Nakamoto {nk:?}", + &self.local_peer ); inv_state.del_peer(&NeighborAddress::from_neighbor_key(nk.clone(), pubkh)); } - self.pending_messages.remove(&(event_id, nk.clone())); - self.pending_stacks_messages.remove(&(event_id, nk.clone())); - } - - match self.network { - None => {} - Some(ref mut network) => { - // deregister socket if connected and registered already - if let Some(socket) = self.sockets.remove(&event_id) { - let _ = network.deregister(event_id, &socket); - } - // deregister socket if still connecting - if let Some(ConnectingPeer { socket, .. }) = self.connecting.remove(&event_id) { - let _ = network.deregister(event_id, &socket); - } - } } - - self.relay_handles.remove(&event_id); - self.peers.remove(&event_id); } /// Deregister by neighbor key - pub fn deregister_neighbor(&mut self, neighbor_key: &NeighborKey) { - debug!("Disconnect from {:?}", neighbor_key); - let event_id = match self.events.get(neighbor_key) { - None => { - return; - } - Some(eid) => *eid, + pub fn deregister_neighbor( + &mut self, + neighbor: &NeighborKey, + reason: DropReason, + source: DropSource, + ) { + debug!("Disconnect from {neighbor:?}"); + if !self.events.contains_key(neighbor) { + return; }; - self.deregister_peer(event_id); + self.deregister_peer(DropPeer { + reason, + address: neighbor.addrbytes, + source, + }); } /// Deregister and ban a neighbor - pub fn deregister_and_ban_neighbor(&mut self, neighbor: &NeighborKey) { - debug!("Disconnect from and ban {:?}", neighbor); - if let Some(event_id) = self.events.get(neighbor) { + pub fn deregister_and_ban_neighbor( + &mut self, + neighbor: &NeighborKey, + reason: DropReason, + source: DropSource, + ) { + debug!("Disconnect from and ban {neighbor:?}"); + let event_id = self.events.get(neighbor).map(|event_id| { self.bans.insert(*event_id); - } - + *event_id + }); self.relayer_stats.process_neighbor_ban(neighbor); - self.deregister_neighbor(neighbor); + if event_id.is_some() { + self.deregister_peer(DropPeer { + reason, + address: neighbor.addrbytes, + source, + }); + } } /// Sign a p2p message to be sent to a particular neighbor we're having a conversation with. @@ -2226,7 +2444,7 @@ impl PeerNetwork { dns_client_opt: &mut Option<&mut DNSClient>, poll_state: &mut NetworkPollState, ibd: bool, - ) -> (Vec, HashMap>) { + ) -> (Vec, HashMap>) { let mut to_remove = vec![]; let mut unhandled: HashMap> = HashMap::new(); @@ -2239,25 +2457,44 @@ impl PeerNetwork { ibd, ) { Ok((convo_unhandled, alive)) => (convo_unhandled, alive), - Err(_e) => { + Err(e) => { + let convo = self.get_p2p_convo(*event_id); debug!( - "{:?}: Connection to {:?} failed: {:?}", + "{:?}: Connection to {convo:?} failed: {e:?}", &self.local_peer, - self.get_p2p_convo(*event_id), - &_e ); - to_remove.push(*event_id); + if let Some(convo) = convo { + to_remove.push(DropPeer { + address: convo.peer_addrbytes, + reason: DropReason::BrokenConnection(format!("Connection failed: {e}")), + source: if ibd { + DropSource::PeerNetworkInboundReadySocket + } else { + DropSource::PeerNetworkOutboundReadySocket + }, + }); + } continue; } }; if !alive { + let convo = self.get_p2p_convo(*event_id); debug!( - "{:?}: Connection to {:?} is no longer alive", - &self.local_peer, - self.get_p2p_convo(*event_id), + "{:?}: Connection to {convo:?} is no longer alive", + &self.local_peer ); - to_remove.push(*event_id); + if let Some(convo) = convo { + to_remove.push(DropPeer { + address: convo.peer_addrbytes, + reason: DropReason::DeadConnection("Connection is no longer alive".into()), + source: if ibd { + DropSource::PeerNetworkInboundReadySocket + } else { + DropSource::PeerNetworkOutboundReadySocket + }, + }); + } } // forward along unhandled messages from this peer @@ -2285,15 +2522,15 @@ impl PeerNetwork { /// -- Prune our frontier if it gets too big. fn process_neighbor_walk(&mut self, walk_result: NeighborWalkResult) { for broken in walk_result.broken_connections.iter() { - self.deregister_and_ban_neighbor(broken); + self.deregister_and_ban_neighbor(&broken.key, broken.reason.clone(), broken.source); } for dead in walk_result.dead_connections.iter() { - self.deregister_neighbor(dead); + self.deregister_neighbor(&dead.key, dead.reason.clone(), dead.source); } for replaced in walk_result.replaced_neighbors.iter() { - self.deregister_neighbor(replaced); + self.deregister_neighbor(&replaced.key, replaced.reason.clone(), replaced.source); } // store for later @@ -2359,11 +2596,19 @@ impl PeerNetwork { peer.timestamp + self.connection_opts.timeout, now ); - to_remove.push(*event_id); + to_remove.push(DropPeer { + address: peer.nk.addrbytes, + reason: DropReason::Unresponsive { + timeout: self.connection_opts.timeout, + last_seen: peer.timestamp, + status: PeerStatus::Connecting, + }, + source: DropSource::PeerNetworkDisconnectUnresponsive, + }); } } - for (event_id, convo) in self.peers.iter() { + for convo in self.peers.values() { if convo.is_authenticated() && convo.stats.last_contact_time > 0 { // have handshaked with this remote peer if convo.stats.last_contact_time @@ -2381,7 +2626,16 @@ impl PeerNetwork { self.connection_opts.neighbor_request_timeout, now ); - to_remove.push(*event_id); + + to_remove.push(DropPeer { + address: convo.peer_addrbytes, + reason: DropReason::Unresponsive { + timeout: self.connection_opts.timeout, + last_seen: convo.peer_heartbeat.into(), + status: PeerStatus::Authenticated, + }, + source: DropSource::PeerNetworkDisconnectUnresponsive, + }); } } else { // have not handshaked with this remote peer @@ -2394,14 +2648,23 @@ impl PeerNetwork { self.connection_opts.handshake_timeout, now ); - to_remove.push(*event_id); + + to_remove.push(DropPeer { + address: convo.peer_addrbytes, + reason: DropReason::Unresponsive { + timeout: self.connection_opts.timeout, + last_seen: convo.instantiated, + status: PeerStatus::Unauthenticated, + }, + source: DropSource::PeerNetworkDisconnectUnresponsive, + }); } } } let ret = to_remove.len(); - for event_id in to_remove.into_iter() { - self.deregister_peer(event_id); + for peer in to_remove.into_iter() { + self.deregister_peer(peer); } ret } @@ -2534,7 +2797,7 @@ impl PeerNetwork { /// Flush relayed message handles, but don't block. /// Drop broken handles. /// Return the list of broken conversation event IDs - fn flush_relay_handles(&mut self) -> Vec { + fn flush_relay_handles(&mut self) -> Vec { let mut broken = vec![]; let mut drained = vec![]; @@ -2577,17 +2840,25 @@ impl PeerNetwork { let (num_sent, flushed) = match res { Ok(Ok(x)) => x, - Ok(Err(_)) | Err(_) => { + Ok(Err(e)) | Err(e) => { // connection broken; next list - debug!("Relay handle broken to event {}", event_id); - broken.push(*event_id); + debug!("Relay handle broken to event {event_id}"); + if let Some(peer) = self.peers.get(event_id) { + broken.push(DropPeer { + address: peer.peer_addrbytes, + reason: DropReason::BrokenConnection(format!( + "Relay handle broken: {e}" + )), + source: DropSource::PeerNetwork, + }); + } break; } }; if !flushed && num_sent == 0 { // blocked on this peer's socket - debug!("Relay handle to event {} is blocked", event_id); + debug!("Relay handle to event {event_id} is blocked"); break; } @@ -2712,14 +2983,18 @@ impl PeerNetwork { } /// Disconnect from all peers - fn disconnect_all(&mut self) { - let mut all_event_ids = vec![]; - for (eid, _) in self.peers.iter() { - all_event_ids.push(*eid); - } - - for eid in all_event_ids.into_iter() { - self.deregister_peer(eid); + fn disconnect_all(&mut self, reason: DropReason, source: DropSource) { + let addresses: Vec<_> = self + .peers + .values() + .map(|convo| convo.peer_addrbytes) + .collect(); + for address in addresses { + self.deregister_peer(DropPeer { + address, + reason: reason.clone(), + source: source.clone(), + }); } } @@ -3004,7 +3279,11 @@ impl PeerNetwork { "{:?}: De-register dead/broken neighbor {:?}", &self.local_peer, &broken_neighbor ); - self.deregister_and_ban_neighbor(&broken_neighbor); + self.deregister_and_ban_neighbor( + &broken_neighbor.key, + broken_neighbor.reason, + DropSource::PeerNetworkBlockDownload, + ); } if done && at_chain_tip { @@ -4769,19 +5048,20 @@ impl PeerNetwork { let unauthenticated_inbounds = self.find_unauthenticated_inbound_convos(); // run existing conversations, clear out broken ones, and get back messages forwarded to us - let (error_events, unsolicited_messages) = self.process_ready_sockets( + let (drop_peers, unsolicited_messages) = self.process_ready_sockets( sortdb, chainstate, &mut dns_client_opt, &mut poll_state, ibd, ); - for error_event in error_events { + for peer in drop_peers { debug!( - "{:?}: Failed connection on event {}", - &self.local_peer, error_event + "{:?}: Failed connection on peer {}", + &self.local_peer, + peer.address.pretty_print() ); - self.deregister_peer(error_event); + self.deregister_peer(peer); } // filter out unsolicited messages and buffer up ones that might become processable @@ -4817,13 +5097,14 @@ impl PeerNetwork { // prune back our connections if it's been a while // (only do this if we're done with all other tasks). // Also, process banned peers. - if let Ok(dead_events) = self.process_bans() { - for dead in dead_events.into_iter() { + if let Ok(banned_peers) = self.process_bans() { + for peer in banned_peers.into_iter() { debug!( - "{:?}: Banned connection on event {}", - &self.local_peer, dead + "{:?}: Banned connection on {}", + &self.local_peer, + peer.address.pretty_print() ); - self.deregister_peer(dead); + self.deregister_peer(peer); } } self.prune_connections(); @@ -4867,13 +5148,14 @@ impl PeerNetwork { self.queue_ping_heartbeats(); // move conversations along - let error_events = self.flush_relay_handles(); - for error_event in error_events { + let drop_peers = self.flush_relay_handles(); + for peer in drop_peers { debug!( - "{:?}: Failed connection on event {}", - &self.local_peer, error_event + "{:?}: Failed connection on peer {}", + &self.local_peer, + peer.address.pretty_print(), ); - self.deregister_peer(error_event); + self.deregister_peer(peer); } // is our key about to expire? do we need to re-key? @@ -4913,7 +5195,7 @@ impl PeerNetwork { "{:?}: Fault injection: forcing disconnect", &self.local_peer ); - self.disconnect_all(); + self.disconnect_all(DropReason::FaultInjection, DropSource::PeerNetwork); self.fault_last_disconnect = get_epoch_time_secs(); } } diff --git a/stackslib/src/net/prune.rs b/stackslib/src/net/prune.rs index ac9cb361e5..29312a27d0 100644 --- a/stackslib/src/net/prune.rs +++ b/stackslib/src/net/prune.rs @@ -29,9 +29,9 @@ use crate::net::db::{LocalPeer, PeerDB}; use crate::net::neighbors::*; use crate::net::p2p::*; use crate::net::poll::{NetworkPollState, NetworkState}; -use crate::net::Error as net_error; /// This module contains the logic for pruning client and neighbor connections use crate::net::*; +use crate::net::{DropReason, Error as net_error}; use crate::util_lib::db::{DBConn, Error as db_error}; impl PeerNetwork { @@ -174,7 +174,7 @@ impl PeerNetwork { fn prune_frontier_outbound_orgs( &mut self, preserve: &HashSet, - ) -> Result, net_error> { + ) -> Result, net_error> { let num_outbound = PeerNetwork::count_outbound_conversations(&self.peers); if num_outbound <= self.connection_opts.soft_num_neighbors { return Ok(vec![]); @@ -230,7 +230,7 @@ impl PeerNetwork { &self.local_peer, &neighbor_key, org ); - ret.push(neighbor_key); + ret.push((neighbor_key, DropReason::OrgDominatesPeerTable)); // don't prune too many if num_outbound - (ret.len() as u64) @@ -291,16 +291,15 @@ impl PeerNetwork { ); neighbor_info.remove(0); - ret.push(neighbor_key); + ret.push((neighbor_key, DropReason::OrgTooManyMembers)); } } } debug!( - "{:?}: removed {} outbound peers out of {}", + "{:?}: removed {} outbound peers out of {num_outbound}", &self.local_peer, - ret.len(), - num_outbound + ret.len() ); Ok(ret) } @@ -403,15 +402,19 @@ impl PeerNetwork { pruned_by_ip.len() ); - for prune in pruned_by_ip.iter() { - debug!("{:?}: prune by IP: {:?}", &self.local_peer, prune); - self.deregister_neighbor(prune); + for key in pruned_by_ip.iter() { + debug!( + "{:?}: prune by IP: {:?}", + &self.local_peer, + key.addrbytes.pretty_print() + ); + self.deregister_neighbor(key, DropReason::TooManyConnections, DropSource::PeerNetwork); - if !self.prune_inbound_counts.contains_key(prune) { - self.prune_inbound_counts.insert(prune.clone(), 1); + if !self.prune_inbound_counts.contains_key(key) { + self.prune_inbound_counts.insert(key.clone(), 1); } else { - let c = self.prune_inbound_counts.get(prune).unwrap().to_owned(); - self.prune_inbound_counts.insert(prune.clone(), c + 1); + let c = self.prune_inbound_counts.get(key).unwrap().to_owned(); + self.prune_inbound_counts.insert(key.clone(), c + 1); } } @@ -425,15 +428,19 @@ impl PeerNetwork { pruned_by_org.len() ); - for prune in pruned_by_org.iter() { - debug!("{:?}: prune by Org: {:?}", &self.local_peer, prune); - self.deregister_neighbor(prune); + for (key, reason) in pruned_by_org.iter() { + debug!( + "{:?}: prune by Org: {:?}", + &self.local_peer, + key.addrbytes.pretty_print() + ); + self.deregister_neighbor(key, reason.clone(), DropSource::PeerNetwork); - if !self.prune_outbound_counts.contains_key(prune) { - self.prune_outbound_counts.insert(prune.clone(), 1); + if !self.prune_outbound_counts.contains_key(key) { + self.prune_outbound_counts.insert(key.clone(), 1); } else { - let c = self.prune_outbound_counts.get(prune).unwrap().to_owned(); - self.prune_outbound_counts.insert(prune.clone(), c + 1); + let c = self.prune_outbound_counts.get(key).unwrap().to_owned(); + self.prune_outbound_counts.insert(key.clone(), c + 1); } }