From ccc3a57aeb4431b213b3ec8ec210dc8cb8259152 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 29 Oct 2024 14:33:58 +1100 Subject: [PATCH] Automatic Firewall/NAT Detection (#271) * Initial work towards firewall checking * Checking connectivity by awaiting incoming connections * Correct timeout handling * Testing * Finished testing * fmt * clippy * Inform peers of ENR update --- Cargo.toml | 9 +- src/config.rs | 31 ++ src/metrics.rs | 14 +- src/service.rs | 557 ++++++++++++++++-------------- src/service/connectivity_state.rs | 179 ++++++++++ src/service/test.rs | 19 +- 6 files changed, 549 insertions(+), 260 deletions(-) create mode 100644 src/service/connectivity_state.rs diff --git a/Cargo.toml b/Cargo.toml index 98e0b0218..3c1c33049 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,14 @@ categories = ["network-programming", "asynchronous"] exclude = [".gitignore", ".github/*"] [dependencies] -enr = { version = "0.12", features = ["k256", "ed25519"] } +enr = { git = "https://github.com/sigp/enr", branch = "remove-fields", features = [ + "k256", + "ed25519", +] } # enr = { version = "0.12", features = ["k256", "ed25519"] } tokio = { version = "1", features = ["net", "sync", "macros", "rt"] } libp2p-identity = { version = "0.2", features = [ - "ed25519", - "secp256k1", + "ed25519", + "secp256k1", ], optional = true } multiaddr = { version = "0.18", optional = true } zeroize = { version = "1", features = ["zeroize_derive"] } diff --git a/src/config.rs b/src/config.rs index f33cf84bb..87d71b758 100644 --- a/src/config.rs +++ b/src/config.rs @@ -92,6 +92,18 @@ pub struct Config { /// will last indefinitely. Default is 1 hour. pub ban_duration: Option, + /// Auto-discovering our IP address, is only one part in discovering our NAT/firewall + /// situation. We need to determine if we are behind a firewall that is preventing incoming + /// connections (this is especially true for IPv6 where all connections will report the same + /// external IP). To do this, Discv5 uses a heuristic, which is that after we set an address in + /// our ENR, we wait for this duration to see if we have any incoming connections. If we + /// receive a single INCOMING connection in this duration, we consider ourselves contactable, + /// until we update or change our IP address again. If we fail to receive an incoming + /// connection in this duration, we revoke our ENR address advertisement for 6 hours, before + /// trying again. This can be set to None, to always advertise and never revoke. The default is + /// Some(5 minutes). + pub auto_nat_listen_duration: Option, + /// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with /// timing support. By default, the executor that created the discv5 struct will be used. pub executor: Option>, @@ -141,6 +153,7 @@ impl ConfigBuilder { filter_max_bans_per_ip: Some(5), permit_ban_list: PermitBanList::default(), ban_duration: Some(Duration::from_secs(3600)), // 1 hour + auto_nat_listen_duration: Some(Duration::from_secs(300)), // 5 minutes executor: None, listen_config, }; @@ -295,6 +308,24 @@ impl ConfigBuilder { self } + /// Auto-discovering our IP address, is only one part in discovering our NAT/firewall + /// situation. We need to determine if we are behind a firewall that is preventing incoming + /// connections (this is especially true for IPv6 where all connections will report the same + /// external IP). To do this, Discv5 uses a heuristic, which is that after we set an address in + /// our ENR, we wait for this duration to see if we have any incoming connections. If we + /// receive a single INCOMING connection in this duration, we consider ourselves contactable, + /// until we update or change our IP address again. If we fail to receive an incoming + /// connection in this duration, we revoke our ENR address advertisement for 6 hours, before + /// trying again. This can be set to None, to always advertise and never revoke. The default is + /// Some(5 minutes). + pub fn auto_nat_listen_duration( + &mut self, + auto_nat_listen_duration: Option, + ) -> &mut Self { + self.config.auto_nat_listen_duration = auto_nat_listen_duration; + self + } + /// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with /// timing support. pub fn executor(&mut self, executor: Box) -> &mut Self { diff --git a/src/metrics.rs b/src/metrics.rs index 2e6f2fc9a..ac48128a6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; lazy_static! { pub static ref METRICS: InternalMetrics = InternalMetrics::default(); @@ -16,6 +16,10 @@ pub struct InternalMetrics { pub bytes_sent: AtomicUsize, /// The number of bytes received. pub bytes_recv: AtomicUsize, + /// Whether we consider ourselves contactable or not on ipv4. + pub ipv4_contactable: AtomicBool, + /// Whether we consider ourselves contactable or not on ipv6. + pub ipv6_contactable: AtomicBool, } impl Default for InternalMetrics { @@ -26,6 +30,8 @@ impl Default for InternalMetrics { unsolicited_requests_per_window: AtomicUsize::new(0), bytes_sent: AtomicUsize::new(0), bytes_recv: AtomicUsize::new(0), + ipv4_contactable: AtomicBool::new(false), + ipv6_contactable: AtomicBool::new(false), } } } @@ -55,6 +61,10 @@ pub struct Metrics { pub bytes_sent: usize, /// The number of bytes received. pub bytes_recv: usize, + /// Whether we consider ourselves contactable or not. + pub ipv4_contactable: bool, + /// Whether we consider ourselves contactable or not. + pub ipv6_contactable: bool, } impl From<&METRICS> for Metrics { @@ -67,6 +77,8 @@ impl From<&METRICS> for Metrics { / internal_metrics.moving_window as f64, bytes_sent: internal_metrics.bytes_sent.load(Ordering::Relaxed), bytes_recv: internal_metrics.bytes_recv.load(Ordering::Relaxed), + ipv4_contactable: internal_metrics.ipv4_contactable.load(Ordering::Relaxed), + ipv6_contactable: internal_metrics.ipv6_contactable.load(Ordering::Relaxed), } } } diff --git a/src/service.rs b/src/service.rs index 8b4093985..2661f8463 100644 --- a/src/service.rs +++ b/src/service.rs @@ -31,6 +31,9 @@ use crate::{ }, rpc, Config, Enr, Event, IpMode, }; +use connectivity_state::{ + ConnectivityState, TimerFailure, DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT, +}; use delay_map::HashSetDelay; use enr::{CombinedKey, NodeId}; use fnv::FnvHashMap; @@ -49,6 +52,7 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, trace, warn}; +mod connectivity_state; mod ip_vote; mod query_info; mod test; @@ -170,52 +174,41 @@ use crate::discv5::PERMIT_BAN_LIST; pub struct Service { /// Configuration parameters. config: Config, - /// The local ENR of the server. local_enr: Arc>, - /// The key associated with the local ENR. enr_key: Arc>, - /// Storage of the ENR record for each node. kbuckets: Arc>>, - /// All the iterative queries we are currently performing. queries: QueryPool, - /// RPC requests that have been sent and are awaiting a response. Some requests are linked to a /// query. active_requests: FnvHashMap, - /// Keeps track of the number of responses received from a NODES response. active_nodes_responses: HashMap, - /// A map of votes nodes have made about our external IP address. We accept the majority. ip_votes: Option, - /// The channel to send messages to the handler. handler_send: mpsc::UnboundedSender, - /// The channel to receive messages from the handler. handler_recv: mpsc::Receiver, - /// The exit channel to shutdown the handler. handler_exit: Option>, - /// The channel of messages sent by the controlling discv5 wrapper. discv5_recv: mpsc::Receiver, - /// The exit channel for the service. exit: oneshot::Receiver<()>, - /// A queue of peers that require regular ping to check connectivity. peers_to_ping: HashSetDelay, - /// A channel that the service emits events on. event_stream: Option>, - - // Type of socket we are using + /// Type of socket we are using ip_mode: IpMode, + /// This stores information about whether we think we have open ports and if we are externally + /// contactable or not. This decides if we should update our ENR or set it to None, if we are + /// not contactable. + connectivity_state: ConnectivityState, } /// Active RPC request awaiting a response from the handler. @@ -300,6 +293,8 @@ impl Service { let (discv5_send, discv5_recv) = mpsc::channel(30); let (exit_send, exit) = oneshot::channel(); + let connectivity_state = ConnectivityState::new(config.auto_nat_listen_duration); + config .executor .clone() @@ -322,6 +317,7 @@ impl Service { exit, config: config.clone(), ip_mode, + connectivity_state, }; info!(mode = ?service.ip_mode, "Discv5 Service started"); @@ -378,8 +374,8 @@ impl Service { Some(event) = self.handler_recv.recv() => { match event { HandlerOut::Established(enr, socket_addr, direction) => { - self.send_event(Event::SessionEstablished(enr.clone(), socket_addr)); - self.inject_session_established(enr, direction); + self.inject_session_established(enr.clone(), &socket_addr, direction); + self.send_event(Event::SessionEstablished(enr, socket_addr)); } HandlerOut::Request(node_address, request) => { self.handle_rpc_request(node_address, *request); @@ -470,6 +466,38 @@ impl Service { self.send_ping(enr, None); } } + connectivity_timeout = self.connectivity_state.poll() => { + let updated_enr = match connectivity_timeout { + TimerFailure::V4 => { + // We have not received enough incoming connections in the required + // time. Remove our ENR advertisement. + info!(ip_version="v4", next_attempt_in=%DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT.as_secs(), "UDP Socket removed from ENR"); + if let Err(error) = self.local_enr.write().remove_udp_socket(&self.enr_key.read()) { + error!(?error, "Failed to update the ENR"); + false + } else { + // ENR was updated + true + } + } + TimerFailure::V6 => { + // We have not received enough incoming connections in the required + // time. Remove our ENR advertisement. + info!(ip_version="v6", next_attempt_in=%DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT.as_secs(), "UDP Socket removed from ENR"); + if let Err(error) = self.local_enr.write().remove_udp6_socket(&self.enr_key.read()) { + error!(?error, "Failed to update the ENR"); + false + } else { + // ENR was updated + true + } + } + }; + if updated_enr { + // Inform our known peers of our updated ENR + self.ping_connected_peers(); + } + } } } } @@ -656,276 +684,292 @@ impl Service { // verify we know of the rpc_id let id = response.id.clone(); - if let Some(mut active_request) = self.active_requests.remove(&id) { - debug!( - response = %response.body, - request = %active_request.request_body, - from = %active_request.contact, - "Received RPC response", + let Some(mut active_request) = self.active_requests.remove(&id) else { + warn!(%id, "Received an RPC response which doesn't match a request"); + return; + }; + + debug!( + response = %response.body, + request = %active_request.request_body, + from = %active_request.contact, + "Received RPC response", + ); + // Check that the responder matches the expected request + + let expected_node_address = active_request.contact.node_address(); + if expected_node_address != node_address { + debug_unreachable!("Handler returned a response not matching the used socket addr"); + return error!( + expected = %expected_node_address, + received = %node_address, + request_id = %id, + "Received a response from an unexpected address", ); + } - // Check that the responder matches the expected request + if !response.match_request(&active_request.request_body) { + warn!( + %node_address, + "Node gave an incorrect response type. Ignoring response" + ); + return; + } - let expected_node_address = active_request.contact.node_address(); - if expected_node_address != node_address { - debug_unreachable!("Handler returned a response not matching the used socket addr"); - return error!( - expected = %expected_node_address, - received = %node_address, - request_id = %id, - "Received a response from an unexpected address", - ); - } + let node_id = node_address.node_id; - if !response.match_request(&active_request.request_body) { - warn!( - %node_address, - "Node gave an incorrect response type. Ignoring response" - ); - return; - } + match response.body { + ResponseBody::Nodes { total, mut nodes } => { + if total > MAX_NODES_RESPONSES as u64 { + warn!( + total, + "NodesResponse has a total larger than {}, nodes will be truncated", + MAX_NODES_RESPONSES + ); + } - let node_id = node_address.node_id; + // These are sanitized and ordered + let distances_requested = match &active_request.request_body { + RequestBody::FindNode { distances } => distances, + _ => unreachable!(), + }; - match response.body { - ResponseBody::Nodes { total, mut nodes } => { - if total > MAX_NODES_RESPONSES as u64 { - warn!( - total, - "NodesResponse has a total larger than {}, nodes will be truncated", - MAX_NODES_RESPONSES - ); + if let Some(CallbackResponse::Nodes(callback)) = active_request.callback.take() { + if let Err(e) = callback.send(Ok(nodes)) { + warn!(error = ?e, "Failed to send response in callback") } + return; + } - // These are sanitized and ordered - let distances_requested = match &active_request.request_body { - RequestBody::FindNode { distances } => distances, - _ => unreachable!(), - }; + // Filter out any nodes that are not of the correct distance + let peer_key: kbucket::Key = node_id.into(); - if let Some(CallbackResponse::Nodes(callback)) = active_request.callback.take() - { - if let Err(e) = callback.send(Ok(nodes)) { - warn!(error = ?e, "Failed to send response in callback") - } - return; + // The distances we send are sanitized an ordered. + // We never send an ENR request in combination of other requests. + if distances_requested.len() == 1 && distances_requested[0] == 0 { + // we requested an ENR update + if nodes.len() > 1 { + warn!( + %node_address, + "Peer returned more than one ENR for itself. Blacklisting", + ); + let ban_timeout = self.config.ban_duration.map(|v| Instant::now() + v); + PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); + nodes.retain(|enr| peer_key.log2_distance(&enr.node_id().into()).is_none()); } - - // Filter out any nodes that are not of the correct distance - let peer_key: kbucket::Key = node_id.into(); - - // The distances we send are sanitized an ordered. - // We never send an ENR request in combination of other requests. - if distances_requested.len() == 1 && distances_requested[0] == 0 { - // we requested an ENR update - if nodes.len() > 1 { - warn!( - %node_address, - "Peer returned more than one ENR for itself. Blacklisting", - ); - let ban_timeout = self.config.ban_duration.map(|v| Instant::now() + v); - PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - nodes.retain(|enr| { - peer_key.log2_distance(&enr.node_id().into()).is_none() - }); - } - } else { - let before_len = nodes.len(); - nodes.retain(|enr| { - peer_key - .log2_distance(&enr.node_id().into()) - .map(|distance| distances_requested.contains(&distance)) - .unwrap_or_else(|| false) - }); - - if nodes.len() < before_len { - // Peer sent invalid ENRs. Blacklist the Node - let node_id = active_request.contact.node_id(); - let addr = active_request.contact.socket_addr(); - warn!(%node_id, %addr, "ENRs received of unsolicited distances. Blacklisting"); - let ban_timeout = self.config.ban_duration.map(|v| Instant::now() + v); - PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - } + } else { + let before_len = nodes.len(); + nodes.retain(|enr| { + peer_key + .log2_distance(&enr.node_id().into()) + .map(|distance| distances_requested.contains(&distance)) + .unwrap_or_else(|| false) + }); + + if nodes.len() < before_len { + // Peer sent invalid ENRs. Blacklist the Node + let node_id = active_request.contact.node_id(); + let addr = active_request.contact.socket_addr(); + warn!(%node_id, %addr, "ENRs received of unsolicited distances. Blacklisting"); + let ban_timeout = self.config.ban_duration.map(|v| Instant::now() + v); + PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); } + } - // handle the case that there is more than one response - if total > 1 { - let mut current_response = - self.active_nodes_responses.remove(&id).unwrap_or_default(); - - debug!( - "Nodes Response: {} of {} received", - current_response.count, total - ); - // If there are more requests coming, store the nodes and wait for - // another response - // If we have already received all our required nodes, drop any extra - // rpc messages. - if current_response.received_nodes.len() < self.config.max_nodes_response - && (current_response.count as u64) < total - && current_response.count < MAX_NODES_RESPONSES - { - current_response.count += 1; + // handle the case that there is more than one response + if total > 1 { + let mut current_response = + self.active_nodes_responses.remove(&id).unwrap_or_default(); - current_response.received_nodes.append(&mut nodes); - self.active_nodes_responses - .insert(id.clone(), current_response); - self.active_requests.insert(id, active_request); - return; - } + debug!( + "Nodes Response: {} of {} received", + current_response.count, total + ); + // If there are more requests coming, store the nodes and wait for + // another response + // If we have already received all our required nodes, drop any extra + // rpc messages. + if current_response.received_nodes.len() < self.config.max_nodes_response + && (current_response.count as u64) < total + && current_response.count < MAX_NODES_RESPONSES + { + current_response.count += 1; - // have received all the Nodes responses we are willing to accept - // ignore duplicates here as they will be handled when adding - // to the DHT current_response.received_nodes.append(&mut nodes); - nodes = current_response.received_nodes; + self.active_nodes_responses + .insert(id.clone(), current_response); + self.active_requests.insert(id, active_request); + return; } - debug!( - len = nodes.len(), - total, - from = %active_request.contact, - "Received a nodes response", - ); - // note: If a peer sends an initial NODES response with a total > 1 then - // in a later response sends a response with a total of 1, all previous nodes - // will be ignored. - // ensure any mapping is removed in this rare case - self.active_nodes_responses.remove(&id); + // have received all the Nodes responses we are willing to accept + // ignore duplicates here as they will be handled when adding + // to the DHT + current_response.received_nodes.append(&mut nodes); + nodes = current_response.received_nodes; + } + + debug!( + len = nodes.len(), + total, + from = %active_request.contact, + "Received a nodes response", + ); + // note: If a peer sends an initial NODES response with a total > 1 then + // in a later response sends a response with a total of 1, all previous nodes + // will be ignored. + // ensure any mapping is removed in this rare case + self.active_nodes_responses.remove(&id); - self.discovered(&node_id, nodes, active_request.query_id); + self.discovered(&node_id, nodes, active_request.query_id); + } + ResponseBody::Pong { enr_seq, ip, port } => { + // Send the response to the user, if they are who asked + if let Some(CallbackResponse::Pong(callback)) = active_request.callback { + let response = Pong { + enr_seq, + ip, + port: port.get(), + }; + if let Err(e) = callback.send(Ok(response)) { + warn!(error = ?e, "Failed to send callback response") + }; + return; } - ResponseBody::Pong { enr_seq, ip, port } => { - // Send the response to the user, if they are who asked - if let Some(CallbackResponse::Pong(callback)) = active_request.callback { - let response = Pong { - enr_seq, - ip, - port: port.get(), + + let socket = SocketAddr::new(ip, port.get()); + // Register the vote, this counts towards potentially updating the ENR for external + // advertisement + self.handle_ip_vote_from_pong(node_id, socket); + + // check if we need to request a new ENR + if let Some(enr) = self.find_enr(&node_id) { + if enr.seq() < enr_seq { + // request an ENR update + debug!(from = %active_request.contact, "Requesting an ENR update"); + let request_body = RequestBody::FindNode { distances: vec![0] }; + let active_request = ActiveRequest { + contact: active_request.contact, + request_body, + query_id: None, + callback: None, }; + self.send_rpc_request(active_request); + } + // Only update the routing table if the new ENR is contactable + if self.ip_mode.get_contactable_addr(&enr).is_some() { + self.connection_updated(node_id, ConnectionStatus::PongReceived(enr)); + } + } + } + ResponseBody::Talk { response } => { + // Send the response to the user + match active_request.callback { + Some(CallbackResponse::Talk(callback)) => { if let Err(e) = callback.send(Ok(response)) { warn!(error = ?e, "Failed to send callback response") }; - } else { - let socket = SocketAddr::new(ip, port.get()); - // perform ENR majority-based update if required. + } + _ => error!("Invalid callback for response"), + } + } + } + } + + // We have received a PONG which informs us for our external socket. This function decides + // how we should handle this vote and whether or not to update our ENR. This is done on a + // majority-based voting system, see `IpVote` for more details. + fn handle_ip_vote_from_pong(&mut self, node_id: NodeId, socket: SocketAddr) { + // Check that we are in a state to handle any IP votes + if !self.connectivity_state.should_count_ip_vote(&socket) { + return; + } - // Only count votes that from peers we have contacted. - let key: kbucket::Key = node_id.into(); - let should_count = matches!( + // Only count votes that are from peers we have contacted. + let key: kbucket::Key = node_id.into(); + let is_connected_and_outgoing = matches!( self.kbuckets.write().entry(&key), kbucket::Entry::Present(_, status) if status.is_connected() && !status.is_incoming()); - if should_count | self.require_more_ip_votes(socket.is_ipv6()) { - // get the advertised local addresses - let (local_ip4_socket, local_ip6_socket) = { - let local_enr = self.local_enr.read(); - (local_enr.udp4_socket(), local_enr.udp6_socket()) - }; - - if let Some(ref mut ip_votes) = self.ip_votes { - ip_votes.insert(node_id, socket); - let (maybe_ip4_majority, maybe_ip6_majority) = ip_votes.majority(); - - let new_ip4 = maybe_ip4_majority.and_then(|majority| { - if Some(majority) != local_ip4_socket { - Some(majority) - } else { - None - } - }); - let new_ip6 = maybe_ip6_majority.and_then(|majority| { - if Some(majority) != local_ip6_socket { - Some(majority) - } else { - None - } - }); - - if new_ip4.is_some() || new_ip6.is_some() { - let mut updated = false; - - // Check if our advertised IPV6 address needs to be updated. - if let Some(new_ip6) = new_ip6 { - let new_ip6: SocketAddr = new_ip6.into(); - let result = self - .local_enr - .write() - .set_udp_socket(new_ip6, &self.enr_key.read()); - match result { - Ok(_) => { - updated = true; - info!(%new_ip6, "Local UDP ip6 socket updated"); - self.send_event(Event::SocketUpdated(new_ip6)); - } - Err(e) => { - warn!(ip6 = %new_ip6, error = ?e, "Failed to update local UDP ip6 socket."); - } - } - } - if let Some(new_ip4) = new_ip4 { - let new_ip4: SocketAddr = new_ip4.into(); - let result = self - .local_enr - .write() - .set_udp_socket(new_ip4, &self.enr_key.read()); - match result { - Ok(_) => { - updated = true; - info!(%new_ip4, "Local UDP socket updated"); - self.send_event(Event::SocketUpdated(new_ip4)); - } - Err(e) => { - warn!(ip = %new_ip4, error = ?e, "Failed to update local UDP socket."); - } - } - } - if updated { - self.ping_connected_peers(); - } - } - } - } + // Check to make sure this is an outgoing peer vote, otherwise if we need the vote due to a + // lack of minority, we accept it. + if !(is_connected_and_outgoing | self.require_more_ip_votes(socket.is_ipv6())) { + return; + } - // check if we need to request a new ENR - if let Some(enr) = self.find_enr(&node_id) { - if enr.seq() < enr_seq { - // request an ENR update - debug!(from = %active_request.contact, "Requesting an ENR update"); - let request_body = RequestBody::FindNode { distances: vec![0] }; - let active_request = ActiveRequest { - contact: active_request.contact, - request_body, - query_id: None, - callback: None, - }; - self.send_rpc_request(active_request); + match socket { + SocketAddr::V4(_) => { + let local_ip4_socket = self.local_enr.read().udp4_socket(); + if let Some(ip_votes) = self.ip_votes.as_mut() { + ip_votes.insert(node_id, socket); + let maybe_ip4_majority = ip_votes.majority().0; + + let new_ip4 = maybe_ip4_majority.and_then(|majority| { + if Some(majority) != local_ip4_socket { + Some(majority) + } else { + None + } + }); + + // If we have a new ipv4 majority + if let Some(new_ip4) = new_ip4 { + let new_ip4: SocketAddr = new_ip4.into(); + let result = self + .local_enr + .write() + .set_udp_socket(new_ip4, &self.enr_key.read()); + match result { + Ok(_) => { + // Inform the connectivity state that we have updated our IP advertisement + self.connectivity_state.enr_socket_update(&new_ip4); + info!(ip_version="v4", %new_ip4, "Local UDP socket updated"); + self.send_event(Event::SocketUpdated(new_ip4)); + self.ping_connected_peers(); } - // Only update the routing table if the new ENR is contactable - if self.ip_mode.get_contactable_addr(&enr).is_some() { - self.connection_updated( - node_id, - ConnectionStatus::PongReceived(enr), - ); + Err(e) => { + warn!(ip = %new_ip4, error = ?e, "Failed to update local UDP socket."); } } } } - ResponseBody::Talk { response } => { - // Send the response to the user - match active_request.callback { - Some(CallbackResponse::Talk(callback)) => { - if let Err(e) = callback.send(Ok(response)) { - warn!(error = ?e, "Failed to send callback response") - }; + } + SocketAddr::V6(_) => { + let local_ip6_socket = self.local_enr.read().udp6_socket(); + if let Some(ip_votes) = self.ip_votes.as_mut() { + ip_votes.insert(node_id, socket); + let maybe_ip6_majority = ip_votes.majority().1; + + let new_ip6 = maybe_ip6_majority.and_then(|majority| { + if Some(majority) != local_ip6_socket { + Some(majority) + } else { + None + } + }); + // Check if our advertised IPV6 address needs to be updated. + if let Some(new_ip6) = new_ip6 { + let new_ip6: SocketAddr = new_ip6.into(); + let result = self + .local_enr + .write() + .set_udp_socket(new_ip6, &self.enr_key.read()); + match result { + Ok(_) => { + // Inform the connectivity state that we have updated our IP advertisement + self.connectivity_state.enr_socket_update(&new_ip6); + info!(ip_version="v6", %new_ip6, "Local UDP socket updated"); + self.send_event(Event::SocketUpdated(new_ip6)); + self.ping_connected_peers(); + } + Err(e) => { + warn!(ip6 = %new_ip6, error = ?e, "Failed to update local UDP ip6 socket."); + } } - _ => error!("Invalid callback for response"), } } } - } else { - warn!(%id, "Received an RPC response which doesn't match a request"); } } @@ -1428,7 +1472,18 @@ impl Service { /// The equivalent of libp2p `inject_connected()` for a udp session. We have no stream, but a /// session key-pair has been negotiated. - fn inject_session_established(&mut self, enr: Enr, connection_direction: ConnectionDirection) { + fn inject_session_established( + &mut self, + enr: Enr, + socket: &SocketAddr, + connection_direction: ConnectionDirection, + ) { + // Inform the connectivity state that an incoming peer has connected to us. This could + // establish that our externally advertised address is contactable. + if matches!(connection_direction, ConnectionDirection::Incoming) { + self.connectivity_state.received_incoming_connection(socket); + } + // Ignore sessions with non-contactable ENRs if self.ip_mode.get_contactable_addr(&enr).is_none() { return; diff --git a/src/service/connectivity_state.rs b/src/service/connectivity_state.rs new file mode 100644 index 000000000..f7f23d310 --- /dev/null +++ b/src/service/connectivity_state.rs @@ -0,0 +1,179 @@ +//! This keeps track of our whether we should advertise an external IP address or not based on +//! whether we think we are externally contactable or not. +//! +//! We determine this by advertising our discovered IP address, if we receive inbound connections, +//! then we know we are externally contactable. If we see nothing for a period of time, we consider +//! ourselves non-contactable and revoke our advertised IP address. We wait for +//! DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT before trying again. +//! +//! +//! The process works via the following: +//! 1. Our ENR socket gets updated +//! 2. This triggers us to set an incoming wait timer +//! 3. a. If we receive an incoming connection within this time, we consider ourselves contactable +//! and we remove the timer. +//! 3. b. If we don't receive a connection and the timer expires. If the timer expires, we set our +//! external ENR address to None and set the `next_connectivity_test` to +//! DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT in the future. This will prevent counting votes until +//! this time, which prevents our ENR from being updated. + +use crate::metrics::METRICS; +use futures::{ + future::{pending, Either}, + FutureExt, +}; +use std::{ + net::SocketAddr, + pin::Pin, + sync::atomic::Ordering, + time::{Duration, Instant}, +}; +use tokio::time::{sleep, Sleep}; +use tracing::info; + +pub const DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT: Duration = Duration::from_secs(21600); // 6 hours + +/// The number of incoming connections we need to observe before we consider ourselves contactable. +/// A previously disconnected node reconnecting back through a temporarily open port can be false +/// positive. The higher this number, the lower the probability of false positives. +const NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID: usize = 2; + +/// The error returned from polling the ConnectivityState indicating whether IPv4 or IPv6 has +/// failed a connectivity check. +pub enum TimerFailure { + /// IPv4 Timer failure + V4, + /// IPv6 Timer failure + V6, +} + +pub(crate) struct ConnectivityState { + /// The duration we will wait for incoming connections before deciding if we are contactable or + /// not. If this is None, we consider ourselves always contactable. + duration_for_incoming_connections: Option, + /// If we are awaiting for incoming connections, this is the instant that we stop waiting. + ipv4_incoming_wait_time: Option>>, + /// If we are awaiting for incoming connections, this is the instant that we stop waiting. + ipv6_incoming_wait_time: Option>>, + /// The time that we begin checking connectivity tests for ipv4. + pub ipv4_next_connectivity_test: Instant, + /// The time that we begin checking connectivity tests for ipv6. + pub ipv6_next_connectivity_test: Instant, + /// The number of incoming ipv4 nodes we have seen during our awaiting window. + ipv4_incoming_count: usize, + /// The number of incoming ipv6 nodes we have seen during our awaiting window. + ipv6_incoming_count: usize, +} + +impl ConnectivityState { + pub fn new(duration_for_incoming_connections: Option) -> Self { + ConnectivityState { + duration_for_incoming_connections, + ipv4_incoming_wait_time: None, + ipv6_incoming_wait_time: None, + ipv4_next_connectivity_test: Instant::now(), + ipv6_next_connectivity_test: Instant::now(), + ipv4_incoming_count: 0, + ipv6_incoming_count: 0, + } + } + + /// Checks if we are in a state to handle new IP votes. If we are waiting to do a connectivity + /// test for this specific ip kind, this returns false. + pub fn should_count_ip_vote(&self, socket: &SocketAddr) -> bool { + // If this configuration is not set, we just accept all votes and disable this + // functionality. + if self.duration_for_incoming_connections.is_none() { + return true; + } + + // If we have failed a connectivity test, then we wait until the next duration window + // before counting new votes. + match socket { + SocketAddr::V4(_) => Instant::now() >= self.ipv4_next_connectivity_test, + SocketAddr::V6(_) => Instant::now() >= self.ipv6_next_connectivity_test, + } + } + + /// We have updated our external ENR socket. If enabled (i.e duration_for_incoming_connections + /// is not None) then we start a timer to await for any kind of incoming connection. This will + /// verify that we are contactable. If we receive nothing in `duration_for_incoming_connections` then we consider ourselves non-contactable + pub fn enr_socket_update(&mut self, socket: &SocketAddr) { + if let Some(duration_to_wait) = self.duration_for_incoming_connections { + match socket { + SocketAddr::V4(_) => { + self.ipv4_incoming_count = 0; + self.ipv4_incoming_wait_time = Some(Box::pin(sleep(duration_to_wait))) + } + SocketAddr::V6(_) => { + self.ipv6_incoming_count = 0; + self.ipv6_incoming_wait_time = Some(Box::pin(sleep(duration_to_wait))) + } + } + } + } + + // We have received an incoming connection. If we were awaiting for a connection, we remove the + // expiry timer and we are done. The ENR will remain advertised and new votes will still count + // to potentially change the IP address if a legitimate change occurs. + pub fn received_incoming_connection(&mut self, socket: &SocketAddr) { + match socket { + SocketAddr::V4(_) => { + if self.ipv4_incoming_wait_time.is_none() { + // We are not waiting for any v4 connections + return; + } + self.ipv4_incoming_count += 1; + if self.ipv4_incoming_count >= NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID { + info!(ip_version = "v4", "We are contactable"); + self.ipv4_incoming_wait_time = None; + METRICS.ipv4_contactable.store(true, Ordering::Relaxed); + } + } + SocketAddr::V6(_) => { + if self.ipv6_incoming_wait_time.is_none() { + // We are not waiting for any v6 connections + return; + } + self.ipv6_incoming_count += 1; + if self.ipv6_incoming_count >= NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID { + info!(ip_version = "v6", "We are contactable"); + self.ipv6_incoming_wait_time = None; + METRICS.ipv6_contactable.store(true, Ordering::Relaxed); + } + } + } + } + + pub async fn poll(&mut self) -> TimerFailure { + let ipv4_fired = match ( + self.ipv4_incoming_wait_time.as_mut(), + self.ipv6_incoming_wait_time.as_mut(), + ) { + (Some(ipv4_sleep), Some(ipv6_sleep)) => { + match futures::future::select(ipv4_sleep, ipv6_sleep).await { + Either::Left(_) => true, + Either::Right(_) => false, // Ipv6 fired, + } + } + (Some(ipv4_sleep), None) => ipv4_sleep.map(|_| true).await, + (None, Some(ipv6_sleep)) => ipv6_sleep.map(|_| false).await, + (None, None) => pending().await, + }; + + if ipv4_fired { + self.ipv4_next_connectivity_test = + Instant::now() + DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT; + self.ipv4_incoming_wait_time = None; + METRICS.ipv4_contactable.store(false, Ordering::Relaxed); + TimerFailure::V4 + } else { + // Ipv6 fired + self.ipv6_next_connectivity_test = + Instant::now() + DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT; + self.ipv6_incoming_wait_time = None; + METRICS.ipv6_contactable.store(false, Ordering::Relaxed); + TimerFailure::V6 + } + } +} diff --git a/src/service/test.rs b/src/service/test.rs index 128ad9e07..a46681d82 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -92,6 +92,8 @@ async fn build_service( let (_discv5_send, discv5_recv) = mpsc::channel(30); let (_exit_send, exit) = oneshot::channel(); + let connectivity_state = ConnectivityState::new(config.auto_nat_listen_duration); + Service { local_enr, enr_key, @@ -109,6 +111,7 @@ async fn build_service( exit, config, ip_mode: Default::default(), + connectivity_state, } } @@ -150,6 +153,8 @@ fn build_non_handler_service( let (_discv5_send, discv5_recv) = mpsc::channel(30); let (_exit_send, exit) = oneshot::channel(); + let connectivity_state = ConnectivityState::new(config.auto_nat_listen_duration); + let service = Service { local_enr, enr_key, @@ -167,6 +172,7 @@ fn build_non_handler_service( exit, config, ip_mode: IpMode::DualStack, + connectivity_state, }; (service, handler_recv_fake, handler_send_fake) } @@ -267,14 +273,15 @@ async fn test_connection_direction_on_inject_session_established() { let key = &kbucket::Key::from(enr2.node_id()); + let dummy_socket = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 80); // Test that the existing connection direction is not updated. // Incoming - service.inject_session_established(enr2.clone(), ConnectionDirection::Incoming); + service.inject_session_established(enr2.clone(), &dummy_socket, ConnectionDirection::Incoming); let status = service.kbuckets.read().iter_ref().next().unwrap().status; assert!(status.is_connected()); assert_eq!(ConnectionDirection::Incoming, status.direction); - service.inject_session_established(enr2.clone(), ConnectionDirection::Outgoing); + service.inject_session_established(enr2.clone(), &dummy_socket, ConnectionDirection::Outgoing); let status = service.kbuckets.read().iter_ref().next().unwrap().status; assert!(status.is_connected()); assert_eq!(ConnectionDirection::Incoming, status.direction); @@ -286,7 +293,7 @@ async fn test_connection_direction_on_inject_session_established() { Some(ConnectionDirection::Outgoing), ); assert!(matches!(result, UpdateResult::Updated)); - service.inject_session_established(enr2.clone(), ConnectionDirection::Incoming); + service.inject_session_established(enr2.clone(), &dummy_socket, ConnectionDirection::Incoming); let status = service.kbuckets.read().iter_ref().next().unwrap().status; assert!(status.is_connected()); assert_eq!(ConnectionDirection::Outgoing, status.direction); @@ -460,6 +467,8 @@ async fn test_ipv6_update_amongst_ipv4_dominated_network() { // Load up the routing table with 100 random ENRs. + let dummy_socket = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 80); + for _ in 0..100 { let key = CombinedKey::generate_secp256k1(); let ip = generate_rand_ipv4(); @@ -470,7 +479,7 @@ async fn test_ipv6_update_amongst_ipv4_dominated_network() { .unwrap(); let direction = random_connection_direction(); - service.inject_session_established(enr.clone(), direction); + service.inject_session_established(enr.clone(), &dummy_socket, direction); } // Attempt to add 10 IPv6 nodes and expect that we attempt to send 10 PING's to IPv6 nodes. @@ -484,7 +493,7 @@ async fn test_ipv6_update_amongst_ipv4_dominated_network() { .unwrap(); let direction = ConnectionDirection::Outgoing; - service.inject_session_established(enr.clone(), direction); + service.inject_session_established(enr.clone(), &dummy_socket, direction); } // Collect all the messages to the handler and count the PING requests for ENR v6 addresses.