Skip to content

Commit 602fd5d

Browse files
committed
fix: Add WPeerId and change error handling
1 parent e08e61c commit 602fd5d

File tree

3 files changed

+95
-38
lines changed

3 files changed

+95
-38
lines changed

node/src/service/p2p/bootstrap.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::collections::HashMap;
22
use std::hash::{DefaultHasher, Hash, Hasher};
33
use std::io::{Error, ErrorKind};
4-
use std::str::FromStr;
54
use std::time::Duration;
65

76
use libp2p::{
@@ -15,7 +14,7 @@ use libp2p::{
1514
tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder,
1615
};
1716
use log::{debug, error, info, warn};
18-
use primitives::p2p::PeerInfo;
17+
use primitives::p2p::{PeerInfo, WPeerId};
1918

2019
use crate::service::p2p::{P2PError, DEFAULT_REGISTRATION_TTL};
2120

@@ -26,7 +25,7 @@ pub struct BootstrapBehaviour {
2625
pub rendezvous: rendezvous::server::Behaviour,
2726
pub identify: identify::Behaviour,
2827
pub gossipsub: gossipsub::Behaviour,
29-
pub request_response: request_response::cbor::Behaviour<String, PeerInfo>,
28+
pub request_response: request_response::cbor::Behaviour<WPeerId, PeerInfo>,
3029
}
3130

3231
pub struct BootstrapConfig {
@@ -109,8 +108,10 @@ pub(crate) async fn bootstrap(
109108
info!("Starting P2P bootstrap node at {addr}");
110109
swarm.listen_on(addr)?;
111110
for addr in bootstrap_addresses {
112-
info!("Dialing {addr}");
113-
swarm.dial(addr)?;
111+
info!("Attempting to dial peer at {addr}");
112+
if swarm.dial(addr.clone()).is_err() {
113+
warn!("Failed to dial peer");
114+
}
114115
}
115116
swarm
116117
.behaviour_mut()
@@ -146,11 +147,18 @@ fn on_rendezvous_event(
146147
) {
147148
match event {
148149
rendezvous::server::Event::RegistrationExpired(registration) => {
150+
let id = registration.record.peer_id();
149151
info!(
150152
"Registration for peer {} expired in namespace {}",
151-
registration.record.peer_id(),
152-
registration.namespace
153+
id, registration.namespace
153154
);
155+
// Registration expired, remove entry from hashmap
156+
if registrations.remove(&id).is_none() {
157+
error!(
158+
"Could not remove registration for {:?} because it was not found",
159+
id
160+
);
161+
}
154162
}
155163
rendezvous::server::Event::PeerRegistered { peer, registration } => {
156164
info!(
@@ -161,21 +169,32 @@ fn on_rendezvous_event(
161169
peer_id: peer,
162170
multiaddrs: registration.record.addresses().to_vec(),
163171
};
172+
// Serialize PeerInfo
164173
let encoded_peer_info = match bincode::serialize(&peer_info) {
165174
Ok(info) => info,
166175
Err(..) => {
167176
error!("Failed to serialize peer_info");
168177
return;
169178
}
170179
};
171-
registrations.insert(peer, peer_info);
180+
registrations
181+
.entry(peer)
182+
.and_modify(|info| {
183+
for addr in peer_info.multiaddrs.iter() {
184+
if !info.multiaddrs.contains(addr) {
185+
info.multiaddrs.push(addr.clone())
186+
}
187+
}
188+
} )
189+
.or_insert(peer_info);
190+
// Send registration information to other bootstrap nodes.
172191
match swarm
173192
.behaviour_mut()
174193
.gossipsub
175194
.publish(IdentTopic::new(GOSSIP_TOPIC), encoded_peer_info)
176195
{
177-
Ok(..) => info!("Successfully published new peer info"),
178-
Err(..) => error!("Failed to publish new peer info"),
196+
Ok(..) => info!("Successfully published new peer info for peer {peer}"),
197+
Err(..) => error!("Failed to publish new peer info for peer {peer}"),
179198
}
180199
}
181200
other => debug!("Encountered other rendezvous event: {other:?}"),
@@ -189,6 +208,7 @@ fn on_gossipsub_event(
189208
registrations: &mut HashMap<PeerId, PeerInfo>,
190209
) {
191210
match event {
211+
// Received a message with peer information from another bootstrap node.
192212
gossipsub::Event::Message {
193213
propagation_source: peer_id,
194214
message_id: id,
@@ -208,7 +228,16 @@ fn on_gossipsub_event(
208228
"Got registration: {:?} with id: {} from peer: {:?}",
209229
peer_info, id, peer_id
210230
);
211-
registrations.insert(peer_info.peer_id, peer_info);
231+
registrations
232+
.entry(peer_info.peer_id)
233+
.and_modify(|info| {
234+
for addr in peer_info.multiaddrs.iter() {
235+
if !info.multiaddrs.contains(addr) {
236+
info.multiaddrs.push(addr.clone())
237+
}
238+
}
239+
} )
240+
.or_insert(peer_info);
212241
}
213242
other => debug!("Encountered other gossipsub event: {other:?}"),
214243
}
@@ -217,10 +246,11 @@ fn on_gossipsub_event(
217246
/// Handles events within the request_response protocol
218247
fn on_request_response_event(
219248
swarm: &mut Swarm<BootstrapBehaviour>,
220-
event: request_response::Event<String, PeerInfo>,
249+
event: request_response::Event<WPeerId, PeerInfo>,
221250
registrations: &HashMap<PeerId, PeerInfo>,
222251
) {
223252
match event {
253+
// Message received, looking up the mapping
224254
request_response::Event::Message { peer, message } => {
225255
if let Message::Request {
226256
request,
@@ -229,22 +259,17 @@ fn on_request_response_event(
229259
} = message
230260
{
231261
info!("Got request with id {request_id} from {peer}");
232-
let id = match PeerId::from_str(&request) {
233-
Ok(id) => id,
234-
Err(..) => {
235-
error!("Received invalid peer ID");
236-
return;
237-
}
238-
};
239-
info!("Looking up {id:?}");
262+
let id: PeerId = request.into();
240263
match registrations.get(&id) {
241264
Some(peer_info) => {
265+
// Sending the peer information back to the client who opened the channel.
242266
if swarm
243267
.behaviour_mut()
244268
.request_response
245269
.send_response(channel, peer_info.clone())
246270
.is_err()
247271
{
272+
// Could add retries here.
248273
error!("Failed to send peer info to {peer:?}");
249274
}
250275
}
@@ -263,7 +288,7 @@ fn on_request_response_event(
263288
error,
264289
} => warn!("Failed to receive message with id {request_id} from {peer}: {error}"),
265290
request_response::Event::ResponseSent { peer, request_id } => {
266-
info!("Request with id {request_id} sent to {peer}")
291+
debug!("Request with id {request_id} sent to {peer}")
267292
}
268293
}
269294
}

primitives/src/p2p.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,25 @@ fn serialize_peer_id<S: Serializer>(id: &PeerId, serializer: S) -> Result<S::Ok,
4343
let id = id.to_string();
4444
serializer.collect_str(&id)
4545
}
46+
47+
/// Wrapper struct for Peer ID so we can implement `Deserialize` and `Serialize`
48+
/// and use this type in the request response behaviour.
49+
#[cfg(feature = "std")]
50+
#[derive(Clone, Debug, Serialize, Deserialize)]
51+
pub struct WPeerId(
52+
#[serde(serialize_with = "serialize_peer_id")]
53+
#[serde(deserialize_with = "deserialize_peer_id")]
54+
PeerId,
55+
);
56+
57+
impl From<PeerId> for WPeerId {
58+
fn from(value: PeerId) -> Self {
59+
Self(value)
60+
}
61+
}
62+
63+
impl Into<PeerId> for WPeerId {
64+
fn into(self) -> PeerId {
65+
self.0
66+
}
67+
}

storage-provider/client/examples/peer-resolver.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,19 @@
88
//! to the bootstrap node.
99
use std::time::Duration;
1010

11-
use anyhow::Result;
11+
use anyhow::{bail, Result};
1212
use clap::Parser;
1313
use libp2p::futures::StreamExt;
1414
use libp2p::request_response::{Message, ProtocolSupport};
1515
use libp2p::swarm::SwarmEvent;
1616
use libp2p::{
1717
noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder,
1818
};
19-
use primitives::p2p::PeerInfo;
19+
use primitives::p2p::{PeerInfo, WPeerId};
2020
use tracing_subscriber::EnvFilter;
2121

2222
/// Create a discovery swarm
23-
fn create_discover_swarm() -> Result<Swarm<request_response::cbor::Behaviour<String, PeerInfo>>>
24-
{
23+
fn create_discover_swarm() -> Result<Swarm<request_response::cbor::Behaviour<WPeerId, PeerInfo>>> {
2524
let swarm = SwarmBuilder::with_new_identity()
2625
.with_tokio()
2726
.with_tcp(
@@ -45,7 +44,7 @@ fn create_discover_swarm() -> Result<Swarm<request_response::cbor::Behaviour<Str
4544

4645
/// Run the discovery swarm and request the peer ID to multiaddrs mapping.
4746
async fn run_discover(
48-
mut swarm: Swarm<request_response::cbor::Behaviour<String, PeerInfo>>,
47+
mut swarm: Swarm<request_response::cbor::Behaviour<WPeerId, PeerInfo>>,
4948
bootstrap_addr: Multiaddr,
5049
bootstrap_id: &PeerId,
5150
resolve_id: PeerId,
@@ -56,26 +55,37 @@ async fn run_discover(
5655
tokio::select! {
5756
event = swarm.select_next_some() => match event {
5857
SwarmEvent::Behaviour(event) => match event {
59-
libp2p::request_response::Event::Message { peer, message } =>
60-
if let Message::Response { request_id, response } = message {
61-
tracing::info!("Received response with id {request_id} from {peer}");
62-
return Ok(response);
63-
},
64-
libp2p::request_response::Event::OutboundFailure {
58+
request_response::Event::Message { peer, message } => {
59+
if let Message::Response {
60+
request_id,
61+
response,
62+
} = message
63+
{
64+
tracing::info!("Received response with id {request_id} from {peer}");
65+
return Ok(response);
66+
}
67+
}
68+
request_response::Event::OutboundFailure {
6569
peer,
6670
request_id,
6771
error,
68-
} => tracing::warn!("Failed to send message with id {request_id} to {peer}: {error}"),
69-
libp2p::request_response::Event::InboundFailure {
72+
} => {
73+
tracing::error!("Failed to send message with id {request_id} to {peer}: {error}");
74+
bail!("Failed to send message with id {request_id} to {peer}: {error}");
75+
}
76+
request_response::Event::InboundFailure {
7077
peer,
7178
request_id,
7279
error,
73-
} => tracing::warn!("Failed to receive message with id {request_id} from {peer}: {error}"),
74-
libp2p::request_response::Event::ResponseSent { peer, request_id } => tracing::info!("Request with id {request_id} sent to {peer}"),
80+
} => {
81+
tracing::error!("Failed to receive message with id {request_id} from {peer}: {error}");
82+
bail!("Failed to receive message with id {request_id} from {peer}: {error}")
83+
}
84+
other => tracing::debug!("Unreachable event: {other:?}")
7585
},
7686
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
7787
tracing::info!("Connected to {}", peer_id);
78-
swarm.behaviour_mut().send_request(bootstrap_id, resolve_id.to_string());
88+
swarm.behaviour_mut().send_request(bootstrap_id, resolve_id.into());
7989
}
8090
other => tracing::debug!("Received other event: {other:?}"),
8191
}
@@ -107,7 +117,7 @@ async fn main() -> Result<()> {
107117
let peer_info =
108118
run_discover(swarm, cli.bootstrap_addr, &cli.bootstrap_id, cli.resolve_id).await?;
109119
println!(
110-
"Got multiaddrs {:?} for peer {:?}",
120+
"Got multiaddrs {:#?} for peer {:?}",
111121
peer_info.multiaddrs, peer_info.peer_id
112122
);
113123
Ok(())

0 commit comments

Comments
 (0)