diff --git a/src/protocol/libp2p/kademlia/bucket.rs b/src/protocol/libp2p/kademlia/bucket.rs index 2b1ae87c..3d3ee86a 100644 --- a/src/protocol/libp2p/kademlia/bucket.rs +++ b/src/protocol/libp2p/kademlia/bucket.rs @@ -69,11 +69,17 @@ impl KBucket { } /// Get entry into the bucket. - // TODO: this is horrible code pub fn entry(&mut self, key: Key) -> KBucketEntry<'_> { - for i in 0..self.nodes.len() { - if self.nodes[i].key == key { - return KBucketEntry::Occupied(&mut self.nodes[i]); + let mut replace_candidate = None; + for (index, node) in self.nodes.iter().enumerate() { + // If the node is already present in the k-bucket, return it. + if node.key.as_ref() == key.as_ref() { + return KBucketEntry::Occupied(&mut self.nodes[index]); + } + + // Cache a not-connected node to replace it if necessary. + if node.connection == ConnectionType::NotConnected { + replace_candidate = Some(index); } } @@ -83,17 +89,13 @@ impl KBucket { vec![], ConnectionType::NotConnected, )); - let len = self.nodes.len() - 1; - return KBucketEntry::Vacant(&mut self.nodes[len]); + + let index: usize = self.nodes.len() - 1; + return KBucketEntry::Vacant(&mut self.nodes[index]); } - for i in 0..self.nodes.len() { - match self.nodes[i].connection { - ConnectionType::NotConnected | ConnectionType::CannotConnect => { - return KBucketEntry::Vacant(&mut self.nodes[i]); - } - _ => continue, - } + if let Some(replace_candidate) = replace_candidate { + return KBucketEntry::Vacant(&mut self.nodes[replace_candidate]); } KBucketEntry::NoSlot diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index a46e59f4..5ed02a9c 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -36,6 +36,7 @@ use crate::{ Direction, TransportEvent, TransportService, }, substream::Substream, + transport::Endpoint, types::SubstreamId, PeerId, }; @@ -194,13 +195,37 @@ impl Kademlia { } /// Connection established to remote peer. - fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> { + fn on_connection_established(&mut self, peer: PeerId, endpoint: Endpoint) -> crate::Result<()> { tracing::trace!(target: LOG_TARGET, ?peer, "connection established"); match self.peers.entry(peer) { Entry::Vacant(entry) => { - if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { - entry.connection = ConnectionType::Connected; + match self.routing_table.entry(Key::from(peer)) { + KBucketEntry::Occupied(entry) => { + entry.connection = ConnectionType::Connected; + + // Update the address if not already present. + if !entry.addresses.iter().any(|address| address == endpoint.address()) { + entry.addresses.push(endpoint.address().clone()); + } + } + mut vacant @ KBucketEntry::Vacant(_) => { + // Can only insert a new peer if the routing table update mode is set to + // automatic. + // + // Otherwise, the user is responsible of adding the peer manually if it + // deems necessary. + if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) { + vacant.insert(KademliaPeer::new( + peer, + vec![endpoint.address().clone()], + ConnectionType::Connected, + )); + } + } + entry => { + tracing::debug!(target: LOG_TARGET, ?peer, ?entry, "failed to update routing table on connection"); + } } let Some(actions) = self.pending_dials.remove(&peer) else { @@ -263,6 +288,10 @@ impl Kademlia { }); } + // Don't add the peer to the routing table into a vacant (or already disconnected) entry. + // + // Update the state if the peer could enter the kbucket during `add_known_peer` or + // `on_connection_established`. if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { entry.connection = ConnectionType::NotConnected; } @@ -714,8 +743,8 @@ impl Kademlia { tokio::select! { event = self.service.next() => match event { - Some(TransportEvent::ConnectionEstablished { peer, .. }) => { - if let Err(error) = self.on_connection_established(peer) { + Some(TransportEvent::ConnectionEstablished { peer, endpoint }) => { + if let Err(error) = self.on_connection_established(peer, endpoint) { tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection"); } } diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index 2fcbc172..aef7f5ac 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -166,10 +166,24 @@ impl RoutingTable { match self.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?connection, + ?entry, + "Peer present into the routing table, overwriting entry", + ); + entry.addresses = addresses; entry.connection = connection; } mut entry @ KBucketEntry::Vacant(_) => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?connection, + "Adding peer to the routing table into a vacant entry", + ); entry.insert(KademliaPeer::new(peer, addresses, connection)); } KBucketEntry::LocalNode => tracing::warn!( @@ -445,7 +459,7 @@ mod tests { entry.insert(KademliaPeer::new( peer, vec!["/ip6/::1/tcp/8888".parse().unwrap()], - ConnectionType::CanConnect, + ConnectionType::Connected, )); // verify the node is still there @@ -456,7 +470,7 @@ mod tests { KBucketEntry::Occupied(&mut KademliaPeer::new( peer, addresses, - ConnectionType::CanConnect, + ConnectionType::Connected, )) ); } @@ -497,7 +511,7 @@ mod tests { entry.insert(KademliaPeer::new( peer, vec!["/ip6/::1/tcp/8888".parse().unwrap()], - ConnectionType::CanConnect, + ConnectionType::Connected, )); } diff --git a/src/protocol/libp2p/kademlia/types.rs b/src/protocol/libp2p/kademlia/types.rs index a0542653..cdb7f8e5 100644 --- a/src/protocol/libp2p/kademlia/types.rs +++ b/src/protocol/libp2p/kademlia/types.rs @@ -201,12 +201,6 @@ pub enum ConnectionType { /// Sender is connected to the peer. Connected, - - /// Sender has recently been connected to the peer. - CanConnect, - - /// Sender is unable to connect to the peer. - CannotConnect, } impl TryFrom for ConnectionType { @@ -216,8 +210,6 @@ impl TryFrom for ConnectionType { match value { 0 => Ok(ConnectionType::NotConnected), 1 => Ok(ConnectionType::Connected), - 2 => Ok(ConnectionType::CanConnect), - 3 => Ok(ConnectionType::CannotConnect), _ => Err(()), } } @@ -228,8 +220,6 @@ impl From for i32 { match connection { ConnectionType::NotConnected => 0, ConnectionType::Connected => 1, - ConnectionType::CanConnect => 2, - ConnectionType::CannotConnect => 3, } } }