diff --git a/Cargo.lock b/Cargo.lock index 4f9a32404..2d74941e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2233,6 +2233,15 @@ dependencies = [ "serde", ] +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.5" @@ -6058,6 +6067,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-ticker" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e" +dependencies = [ + "futures", + "futures-timer", + "instant", +] + [[package]] name = "futures-timer" version = "3.0.3" @@ -6415,6 +6435,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hickory-proto" version = "0.24.2" @@ -7964,6 +7990,7 @@ dependencies = [ "libp2p-connection-limits 0.4.0", "libp2p-core 0.42.0", "libp2p-dns 0.42.0", + "libp2p-gossipsub", "libp2p-identify 0.45.0", "libp2p-identity", "libp2p-mdns 0.46.0", @@ -7971,6 +7998,7 @@ dependencies = [ "libp2p-noise 0.45.0", "libp2p-quic 0.11.1", "libp2p-rendezvous", + "libp2p-request-response 0.27.0", "libp2p-swarm 0.45.1", "libp2p-tcp 0.42.0", "libp2p-upnp 0.3.0", @@ -8117,6 +8145,37 @@ dependencies = [ "tracing", ] +[[package]] +name = "libp2p-gossipsub" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4e830fdf24ac8c444c12415903174d506e1e077fbe3875c404a78c5935a8543" +dependencies = [ + "asynchronous-codec 0.7.0", + "base64 0.22.1", + "byteorder", + "bytes", + "either", + "fnv", + "futures", + "futures-ticker", + "getrandom", + "hex_fmt", + "libp2p-core 0.42.0", + "libp2p-identity", + "libp2p-swarm 0.45.1", + "prometheus-client 0.22.3", + "quick-protobuf", + "quick-protobuf-codec 0.3.1", + "rand", + "regex", + "sha2 0.10.8", + "smallvec", + "tracing", + "void", + "web-time", +] + [[package]] name = "libp2p-identify" version = "0.43.1" @@ -8279,6 +8338,7 @@ checksum = "77ebafa94a717c8442d8db8d3ae5d1c6a15e30f2d347e0cd31d057ca72e42566" dependencies = [ "futures", "libp2p-core 0.42.0", + "libp2p-gossipsub", "libp2p-identify 0.45.0", "libp2p-identity", "libp2p-swarm 0.45.1", @@ -8453,6 +8513,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6" dependencies = [ "async-trait", + "cbor4ii 0.3.3", "futures", "futures-bounded 0.2.4", "futures-timer", @@ -8460,6 +8521,7 @@ dependencies = [ "libp2p-identity", "libp2p-swarm 0.45.1", "rand", + "serde", "smallvec", "tracing", "void", @@ -13868,6 +13930,7 @@ dependencies = [ "substrate-frame-rpc-system", "substrate-prometheus-endpoint", "thiserror 2.0.8", + "tokio", ] [[package]] @@ -19424,7 +19487,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493" dependencies = [ - "cbor4ii", + "cbor4ii 0.2.14", "ipld-core", "scopeguard", "serde", diff --git a/Cargo.toml b/Cargo.toml index 08ca43651..91264623b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ async-trait = "0.1.80" axum = "0.7.5" base64 = "0.22.1" beetswap = "0.4.0" +bincode = "1.3.3" bitflags = "2.5.0" blake2b_simd = { version = "1.0.2", default-features = false } blockstore = "0.7.1" diff --git a/Justfile b/Justfile index 69b55cd50..9ca0d1066 100644 --- a/Justfile +++ b/Justfile @@ -28,10 +28,17 @@ release-testnet: # run the testnet without building run-testnet: mkdir -p /tmp/zombienet - openssl genpkey -algorithm ED25519 -out /tmp/zombienet/private.pem - openssl pkey -in /tmp/zombienet/private.pem -pubout -out /tmp/zombienet/public.pem # Generate public key so script can get the Peer ID + openssl genpkey -algorithm ED25519 -out /tmp/zombienet/charlie-private.pem + openssl pkey -in /tmp/zombienet/charlie-private.pem -pubout -out /tmp/zombienet/charlie-public.pem # Generate public key so script can get the Peer ID zombienet -p native spawn zombienet/local-testnet.toml +# Run a single collator +run-collator: + mkdir -p /tmp/zombienet + openssl genpkey -algorithm ED25519 -out /tmp/zombienet/david-private.pem + openssl pkey -in /tmp/zombienet/david-private.pem -pubout -out /tmp/zombienet/david-public.pem # Generate public key so script can get the Peer ID + zombienet -p native spawn zombienet/local-david-collator.toml + # Run the testing building it before testnet: release-testnet run-testnet @@ -165,8 +172,8 @@ load-to-minikube: kube-testnet: mkdir -p /tmp/zombienet - openssl genpkey -algorithm ED25519 -out /tmp/zombienet/private.pem - openssl pkey -in /tmp/zombienet/private.pem -pubout -out /tmp/zombienet/public.pem # Generate public key so script can get the Peer ID + openssl genpkey -algorithm ED25519 -out /tmp/zombienet/charlie-private.pem + openssl pkey -in /tmp/zombienet/charlie-private.pem -pubout -out /tmp/zombienet/charlie-public.pem # Generate public key so script can get the Peer ID zombienet -p kubernetes spawn zombienet/local-kube-testnet.toml # The tarpaulin calls for test coverage have the following options: diff --git a/examples/start_sp.sh b/examples/start_sp.sh index 1e0aa19bf..f1a9a0499 100755 --- a/examples/start_sp.sh +++ b/examples/start_sp.sh @@ -13,7 +13,7 @@ PROVIDER="//Charlie" P2P_ADDRESS="/ip4/127.0.0.1/tcp/62649" P2P_PUBLIC_KEY="/tmp/polka-storage-provider/public.pem" P2P_PRIVATE_KEY="/tmp/polka-storage-provider/private.pem" -P2P_BOOTSTRAP_PUBLIC_KEY="/tmp/zombienet/public.pem" +P2P_BOOTSTRAP_PUBLIC_KEY="/tmp/zombienet/charlie-public.pem" # Config file location CONFIG="/tmp/polka-storage-provider/config.toml" diff --git a/node/Cargo.toml b/node/Cargo.toml index b06999723..04d4fde7f 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -34,8 +34,19 @@ frame-benchmarking = { workspace = true, default-features = true } frame-benchmarking-cli = { workspace = true, default-features = true } futures = { workspace = true } jsonrpsee = { features = ["server"], workspace = true } -libp2p = { workspace = true, features = ["identify", "macros", "noise", "rendezvous", "tcp", "tokio", "yamux"] } -log = { workspace = true, default-features = true } +libp2p = { workspace = true, features = [ + "cbor", + "gossipsub", + "identify", + "macros", + "noise", + "rendezvous", + "request-response", + "tcp", + "tokio", + "yamux", +] } +log = { workspace = true, features = ["kv"], default-features = true } pallet-transaction-payment-rpc = { workspace = true, default-features = true } polkadot-cli = { features = ["rococo-native"], workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } @@ -69,6 +80,7 @@ sp-runtime = { workspace = true, default-features = true } sp-timestamp = { workspace = true, default-features = true } substrate-frame-rpc-system = { workspace = true, default-features = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["macros"] } xcm.workspace = true [build-dependencies] diff --git a/node/src/cli.rs b/node/src/cli.rs index 7ca019150..296a2338f 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -129,4 +129,8 @@ pub struct RunCmd { /// that the bootstrap node binds to. #[arg(long, required = false)] pub p2p_listen_address: Option, + + /// List of other bootstrap nodes + #[arg(long, required = false, num_args = 1..)] + pub bootstrap_addresses: Option>, } diff --git a/node/src/command.rs b/node/src/command.rs index 9dd0ee670..58990b21d 100644 --- a/node/src/command.rs +++ b/node/src/command.rs @@ -238,7 +238,13 @@ pub fn run() -> Result<()> { .ok_or( "This node is configured as authority, so it will be used as Bootstrap node for Storage Provider & Collator network, but the listen address is missing. Set the --p2p-listen-address argument of the node." )?; - Some(BootstrapConfig::new(p2p_key, p2p_listen_address)) + let bootstrap_addresses = cli + .run + .bootstrap_addresses + .ok_or( + "This node is configured as authority, so it will be used as Bootstrap node for Storage Provider & Collator network, but the bootstrap addresses are missing. Set the --bootstrap-addresses argument of the node." + )?; + Some(BootstrapConfig::new(p2p_key, p2p_listen_address, bootstrap_addresses)) } else { None }; diff --git a/node/src/service/p2p/bootstrap.rs b/node/src/service/p2p/bootstrap.rs index 11f339889..f417f1d84 100644 --- a/node/src/service/p2p/bootstrap.rs +++ b/node/src/service/p2p/bootstrap.rs @@ -1,34 +1,54 @@ -use std::time::Duration; +use std::{ + collections::HashMap, + hash::{DefaultHasher, Hash, Hasher}, + io::{Error, ErrorKind}, + time::Duration, +}; use libp2p::{ futures::StreamExt, + gossipsub::{self, IdentTopic}, identify, identity::Keypair, noise, rendezvous, + request_response::{self, Message, ProtocolSupport}, swarm::{NetworkBehaviour, SwarmEvent}, - tcp, yamux, Multiaddr, Swarm, SwarmBuilder, + tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder, +}; +use log::{debug, error, info, warn}; +use primitives::p2p::{ + PeerIdRequest, PeerInfo, PeerInfoResponse, DEFAULT_REGISTRATION_TTL, GOSSIP_TOPIC, + IDENTIFY_PROTOCOL_VERSION, REQUEST_RESPONSE_STREAM_PROTOCOL, }; -use log::{debug, info}; -use crate::service::p2p::{P2PError, DEFAULT_REGISTRATION_TTL}; +use crate::service::p2p::P2PError; #[derive(NetworkBehaviour)] pub struct BootstrapBehaviour { pub rendezvous: rendezvous::server::Behaviour, pub identify: identify::Behaviour, + pub gossipsub: gossipsub::Behaviour, + pub request_response: request_response::cbor::Behaviour, } pub struct BootstrapConfig { address: Multiaddr, keypair: Keypair, + bootstrap_addresses: Vec, } impl BootstrapConfig { - pub fn new(keypair: Keypair, address: Multiaddr) -> Self { - Self { address, keypair } + pub fn new(keypair: Keypair, address: Multiaddr, bootstrap_addresses: Vec) -> Self { + Self { + address, + keypair, + bootstrap_addresses, + } } - pub fn create_swarm(self) -> Result<(Swarm, Multiaddr), P2PError> { + pub fn create_swarm( + self, + ) -> Result<(Swarm, Multiaddr, Vec), P2PError> { let swarm = SwarmBuilder::with_existing_identity(self.keypair) .with_tokio() .with_tcp( @@ -37,22 +57,47 @@ impl BootstrapConfig { yamux::Config::default, ) .map_err(|_| P2PError::InvalidTcpConfig)? - .with_behaviour(|key| BootstrapBehaviour { - // Rendezvous server behaviour for serving new peers to connecting nodes. - rendezvous: rendezvous::server::Behaviour::new( - rendezvous::server::Config::default().with_max_ttl(DEFAULT_REGISTRATION_TTL), // Max TTL of 24 hours - ), - // The identify behaviour is used to share the external address and the public key with connecting clients. - identify: identify::Behaviour::new(identify::Config::new( - "identify/1.0.0".to_string(), - key.public(), - )), + .with_behaviour(|key| { + // To content-address message, we can take the hash of message and use it as an ID. + let message_id_fn = |message: &gossipsub::Message| { + let mut s = DefaultHasher::new(); + message.data.hash(&mut s); + gossipsub::MessageId::from(s.finish().to_string()) + }; + let gossipsub_config = gossipsub::ConfigBuilder::default() + .message_id_fn(message_id_fn) + .build() + .map_err(|msg| Error::new(ErrorKind::Other, msg))?; + + Ok(BootstrapBehaviour { + // Rendezvous server behaviour for serving new peers to connecting nodes. + rendezvous: rendezvous::server::Behaviour::new( + rendezvous::server::Config::default() + .with_max_ttl(DEFAULT_REGISTRATION_TTL), // Max TTL of 24 hours + ), + // The identify behaviour is used to share the external address and the public key with connecting clients. + identify: identify::Behaviour::new(identify::Config::new( + IDENTIFY_PROTOCOL_VERSION.to_string(), + key.public(), + )), + gossipsub: gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(key.clone()), + gossipsub_config, + )?, + request_response: request_response::cbor::Behaviour::new( + [( + StreamProtocol::new(REQUEST_RESPONSE_STREAM_PROTOCOL), + ProtocolSupport::Full, + )], + request_response::Config::default(), + ), + }) }) .map_err(|_| P2PError::InvalidBehaviourConfig)? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(10))) .build(); - Ok((swarm, self.address)) + Ok((swarm, self.address, self.bootstrap_addresses)) } } @@ -61,46 +106,199 @@ impl BootstrapConfig { pub(crate) async fn bootstrap( mut swarm: Swarm, addr: Multiaddr, + bootstrap_addresses: Vec, ) -> Result<(), P2PError> { info!("Starting P2P bootstrap node at {addr}"); swarm.listen_on(addr)?; + for addr in bootstrap_addresses { + info!("Attempting to dial peer at {addr}"); + if swarm.dial(addr.clone()).is_err() { + warn!("Failed to dial peer at address {addr}"); + } + } + swarm + .behaviour_mut() + .gossipsub + .subscribe(&IdentTopic::new(GOSSIP_TOPIC))?; + let mut registrations = HashMap::new(); loop { - match swarm.select_next_some().await { - SwarmEvent::NewListenAddr { address, .. } => { - info!("Listening on {}", address); + tokio::select! { + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {}", address); + } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + info!("Connected to {}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, .. } => { + info!("Disconnected from {}", peer_id); + } + SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous(event)) => on_rendezvous_event(&mut swarm, event, &mut registrations), + SwarmEvent::Behaviour(BootstrapBehaviourEvent::Gossipsub(event)) => on_gossipsub_event(event, *swarm.local_peer_id(), &mut registrations), + SwarmEvent::Behaviour(BootstrapBehaviourEvent::RequestResponse(event)) => on_request_response_event(&mut swarm, event, ®istrations), + other => debug!("Encountered event: {other:?}"), } - SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( - rendezvous::server::Event::PeerRegistered { peer, registration }, - )) => { - info!( - "Peer {} registered for namespace '{}' for {} seconds", - peer, registration.namespace, registration.ttl + } + } +} + +/// Handles events within the rendezvous protocol +fn on_rendezvous_event( + swarm: &mut Swarm, + event: rendezvous::server::Event, + registrations: &mut HashMap, +) { + match event { + rendezvous::server::Event::RegistrationExpired(registration) => { + let id = registration.record.peer_id(); + info!( + "Registration for peer {} expired in namespace {}", + id, registration.namespace + ); + // Registration expired, remove entry from hashmap + if registrations.remove(&id).is_none() { + error!( + "Could not remove registration for {:?} because it was not found", + id ); } - SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( - rendezvous::server::Event::DiscoverServed { - enquirer, - registrations, - }, - )) => { - if !registrations.is_empty() { - info!( - "Served peer {} with {} new registrations", - enquirer, - registrations.len() - ); + } + rendezvous::server::Event::PeerRegistered { peer, registration } => { + info!( + "Peer {} registered for namespace '{}' for {} seconds", + peer, registration.namespace, registration.ttl + ); + let peer_info = PeerInfo { + peer_id: peer, + multiaddrs: registration.record.addresses().to_vec(), + }; + // Serialize PeerInfo + let encoded_peer_info = match serde_json::to_vec(&peer_info) { + Ok(info) => info, + Err(..) => { + error!(peer_info:?; "Failed to serialize peer_info"); + return; } + }; + insert_or_update_registrations(registrations, peer_info); + // Send registration information to other bootstrap nodes. + match swarm + .behaviour_mut() + .gossipsub + .publish(IdentTopic::new(GOSSIP_TOPIC), encoded_peer_info) + { + Ok(..) => info!("Successfully published new peer info for peer {peer}"), + Err(e) => error!(e:?; "Failed to publish new peer info for peer {peer}"), } - SwarmEvent::Behaviour(BootstrapBehaviourEvent::Rendezvous( - rendezvous::server::Event::RegistrationExpired(registration), - )) => { - info!( - "Registration for peer {} expired in namespace {}", - registration.record.peer_id(), - registration.namespace - ); + } + other => debug!("Encountered other rendezvous event: {other:?}"), + } +} + +/// Handles events within the gossipsub protocol +fn on_gossipsub_event( + event: gossipsub::Event, + local_peer_id: PeerId, + registrations: &mut HashMap, +) { + match event { + // Received a message with peer information from another bootstrap node. + gossipsub::Event::Message { + propagation_source: peer_id, + message_id: id, + message, + } => { + // Got a message from ourselves, return early. + if peer_id == local_peer_id { + return; } - other => debug!("Encountered event: {other:?}"), + // Deserialize peer info + let peer_info: PeerInfo = match serde_json::from_slice(&message.data) { + Ok(info) => info, + Err(..) => { + error!(message:? = message.data; "Received invalid peer info from peer {peer_id:?}"); + return; + } + }; + info!( + "Got registration: {:?} with id: {} from peer: {:?}", + peer_info, id, peer_id + ); + insert_or_update_registrations(registrations, peer_info); } + other => debug!("Encountered other gossipsub event: {other:?}"), } } + +/// Handles events within the request_response protocol +fn on_request_response_event( + swarm: &mut Swarm, + event: request_response::Event, + registrations: &HashMap, +) { + match event { + // Message received, looking up the mapping + request_response::Event::Message { peer, message } => { + if let Message::Request { + request, + channel, + request_id, + } = message + { + info!("Got request with id {request_id} from {peer}"); + let id: PeerId = request.into(); + let response = match registrations.get(&id) { + Some(peer_info) => { + info!("Peer {id:?} found in registrations"); + PeerInfoResponse::Found(peer_info.clone()) + } + None => { + info!("Peer {id:?} not found in registrations"); + PeerInfoResponse::NotFound(id.into()) + } + }; + // Sending the peer information back to the client who opened the channel. + // Could add retries here. + if swarm + .behaviour_mut() + .request_response + .send_response(channel, response) + .is_err() + { + error!("Failed to send peer info to {peer:?}"); + } + } + } + request_response::Event::OutboundFailure { + peer, + request_id, + error, + } => warn!("Failed to send message with id {request_id} to {peer}: {error}"), + request_response::Event::InboundFailure { + peer, + request_id, + error, + } => warn!("Failed to receive message with id {request_id} from {peer}: {error}"), + request_response::Event::ResponseSent { peer, request_id } => { + debug!("Request with id {request_id} sent to {peer}") + } + } +} + +/// Take the HashMap and update the entry based on peer_info.peer_id +/// or insert a new entry. +fn insert_or_update_registrations( + registrations: &mut HashMap, + peer_info: PeerInfo, +) { + registrations + .entry(peer_info.peer_id) + .and_modify(|info| { + for addr in peer_info.multiaddrs.iter() { + if !info.multiaddrs.contains(addr) { + info.multiaddrs.push(addr.clone()) + } + } + }) + .or_insert(peer_info); +} diff --git a/node/src/service/p2p/mod.rs b/node/src/service/p2p/mod.rs index ea2e5f5ea..16f59a4a2 100644 --- a/node/src/service/p2p/mod.rs +++ b/node/src/service/p2p/mod.rs @@ -5,8 +5,6 @@ mod bootstrap; pub(crate) use bootstrap::BootstrapConfig; -const DEFAULT_REGISTRATION_TTL: u64 = 86400; - #[derive(Debug, thiserror::Error)] pub enum P2PError { #[error(transparent)] @@ -19,15 +17,19 @@ pub enum P2PError { InvalidBehaviourConfig, #[error(transparent)] P2PTransport(#[from] libp2p::TransportError), + #[error(transparent)] + P2PSubscription(#[from] libp2p::gossipsub::SubscriptionError), } /// Runs a bootstrap node from the given config. /// The `CancellationToken` is used for a graceful shutdown if the user presses ctrl+c pub async fn run_bootstrap_node(config: BootstrapConfig) { info!("Starting P2P bootstrap node"); - let (swarm, addr) = config.create_swarm().expect("Could not create swarm"); + let (swarm, addr, bootstrap_addresses) = config + .create_swarm() + .expect("Could not create bootstrap swarm"); - bootstrap(swarm, addr) + bootstrap(swarm, addr, bootstrap_addresses) .await .expect("Could not run bootstrap node"); } diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 4f3d5e55f..7e299ff9d 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -8,6 +8,7 @@ pub mod sector; #[cfg(feature = "std")] pub mod p2p; + #[cfg(feature = "testing")] pub mod testing { // NOTE(@jmg-duarte,22/01/2025): Since there's only one thing, star import for now. diff --git a/primitives/src/p2p.rs b/primitives/src/p2p.rs index 3132a4784..afa06da4c 100644 --- a/primitives/src/p2p.rs +++ b/primitives/src/p2p.rs @@ -1,11 +1,16 @@ use std::{path::PathBuf, str::FromStr}; use ed25519_dalek::{pkcs8::DecodePrivateKey, SigningKey}; -use libp2p::identity::Keypair; +use libp2p::{identity::Keypair, Multiaddr, PeerId}; +use serde::{de, Deserialize, Serialize, Serializer}; + +pub const GOSSIP_TOPIC: &str = "registrar"; +pub const IDENTIFY_PROTOCOL_VERSION: &str = "identify/1.0.0"; +pub const REQUEST_RESPONSE_STREAM_PROTOCOL: &str = "/resolver/1.0.0"; +pub const DEFAULT_REGISTRATION_TTL: u64 = 86400; /// Parses a ED25519 private key into a Keypair. /// Takes in a private key or the path to a PEM file, depending on the @ prefix. -#[cfg(feature = "std")] pub fn keypair_value_parser(src: &str) -> Result { let key = if let Some(stripped) = src.strip_prefix('@') { let path = PathBuf::from_str(stripped) @@ -19,3 +24,56 @@ pub fn keypair_value_parser(src: &str) -> Result { }; Keypair::ed25519_from_bytes(key.to_bytes()).map_err(|e| e.to_string()) } + +/// Struct holds peer information. +/// - PeerId: Registered Peer ID. +/// - multiaddrs: Vec of multiaddresses the peer has registered. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfo { + #[serde(serialize_with = "serialize_peer_id")] + #[serde(deserialize_with = "deserialize_peer_id")] + pub peer_id: PeerId, + pub multiaddrs: Vec, +} + +/// This enum is used in the request response P2P protocol. +/// PeerInfoResponse::NotFound is returned when the requested peer ID was not found. +/// PeerInfoResponse::Found(..) is returned when the requested peer ID was found. +/// The latter holds the relevant [`PeerInfo`] inside. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum PeerInfoResponse { + Found(PeerInfo), + NotFound(PeerIdRequest), +} + +/// The request type used for the request response P2P protocol. +/// We cannot use PeerId directly because it does not implement +/// Serialize and Deserialize. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerIdRequest( + #[serde(serialize_with = "serialize_peer_id")] + #[serde(deserialize_with = "deserialize_peer_id")] + PeerId, +); + +impl From for PeerIdRequest { + fn from(value: PeerId) -> Self { + Self(value) + } +} + +impl Into for PeerIdRequest { + fn into(self) -> PeerId { + self.0 + } +} + +fn deserialize_peer_id<'de, D: de::Deserializer<'de>>(d: D) -> Result { + let s: String = de::Deserialize::deserialize(d)?; + PeerId::from_str(&s).map_err(de::Error::custom) +} + +fn serialize_peer_id(id: &PeerId, serializer: S) -> Result { + let id = id.to_string(); + serializer.collect_str(&id) +} diff --git a/storage-provider/client/Cargo.toml b/storage-provider/client/Cargo.toml index e593eb04f..1b91e4223 100644 --- a/storage-provider/client/Cargo.toml +++ b/storage-provider/client/Cargo.toml @@ -36,7 +36,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true } [dev-dependencies] -libp2p = { workspace = true, features = ["identify", "macros", "noise", "rendezvous", "tcp", "tokio", "yamux"] } +libp2p = { workspace = true, features = ["cbor", "macros", "noise", "request-response", "tcp", "tokio", "yamux"] } [lints] workspace = true diff --git a/storage-provider/client/examples/peer-resolver.rs b/storage-provider/client/examples/peer-resolver.rs index 6ec1541ab..e9200a553 100644 --- a/storage-provider/client/examples/peer-resolver.rs +++ b/storage-provider/client/examples/peer-resolver.rs @@ -1,133 +1,130 @@ -//! Peer Resolver example -//! -//! This example shows how to use the rendezvous client protocol to -//! connect to rendezvous bootstrap, and send a discovery message, -//! requesting the bootstrap node to return their registrations. -//! Then it will check the registrations to see if a given Peer ID -//! is contained in them to get a Peer ID to multiaddr mapping. -//! If the bootstrap node does not have information on the given Peer -//! ID, the example will return an error. -//! NOTE: This example is to be removed and implemented into the -//! client at some point. -use std::{error::Error, time::Duration}; +//! This example show how to connect to a bootstrap node within the P2P network +//! and request a Peer ID to Multiaddrs mapping. +//! This client uses libp2p's request response protocol to request a Multiaddrs +//! connected to a given Peer ID. +//! The Multiaddr of the bootstrap node needs to be known because the client +//! needs to dial (connect) to the bootstrap node to send a request. +//! The Peer ID of the bootstrap node needs to be known to send the request +//! to the bootstrap node. +use std::time::Duration; use clap::Parser; use libp2p::{ futures::StreamExt, noise, - rendezvous::client::{Behaviour, Event}, + request_response::{self, Message, ProtocolSupport}, swarm::SwarmEvent, - tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, + tcp, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder, }; +use primitives::p2p::{PeerIdRequest, PeerInfoResponse, REQUEST_RESPONSE_STREAM_PROTOCOL}; use tracing_subscriber::EnvFilter; -#[derive(Debug)] -struct PeerInfo { - peer_id: PeerId, - multiaddresses: Vec, -} - -#[derive(Parser)] -struct Cli { - /// Peer ID to resolve - #[arg(long)] - peer_id: PeerId, - - /// Rendezvous point address of the bootstrap node - #[arg(long)] - rendezvous_point_address: Multiaddr, - - /// PeerID of the bootstrap node - #[arg(long)] - rendezvous_point: PeerId, -} - -fn create_swarm() -> Result, Box> { - Ok(SwarmBuilder::with_new_identity() +/// Create a discovery swarm +fn create_discover_swarm( +) -> Result>, String> { + let swarm = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( tcp::Config::default(), noise::Config::new, yamux::Config::default, - )? - .with_behaviour(|key| Behaviour::new(key.clone()))? + ) + .map_err(|e| format!("{e:?}"))? + .with_behaviour(|_| { + request_response::cbor::Behaviour::new( + [( + StreamProtocol::new(REQUEST_RESPONSE_STREAM_PROTOCOL), + ProtocolSupport::Full, + )], + request_response::Config::default(), + ) + }) + .map_err(|e| format!("{e:?}"))? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(10))) - .build()) + .build(); + Ok(swarm) } -async fn discover( - swarm: &mut Swarm, - peer_id_to_find: PeerId, - rendezvous_point_address: Multiaddr, - rendezvous_point: PeerId, -) -> Result> { - // Dial in to the rendezvous point. - swarm.dial(rendezvous_point_address)?; +/// Run the discovery swarm and request the peer ID to multiaddrs mapping. +async fn run_discover( + mut swarm: Swarm>, + bootstrap_addr: Multiaddr, + bootstrap_id: &PeerId, + resolve_id: PeerId, +) -> Result { + swarm.dial(bootstrap_addr).map_err(|e| format!("{e:?}"))?; loop { - match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - if peer_id == rendezvous_point { - tracing::info!("Connection established with rendezvous point {}", peer_id); - - // Requesting rendezvous point for peer discovery - swarm - .behaviour_mut() - .discover(None, None, None, rendezvous_point); - } - } - // Received discovered event from the rendezvous point - SwarmEvent::Behaviour(Event::Discovered { registrations, .. }) => { - // Check registrations - for registration in ®istrations { - // Get peer ID from the registration record - let peer_id = registration.record.peer_id(); - // skip self - if &peer_id == swarm.local_peer_id() { - continue; + tokio::select! { + event = swarm.select_next_some() => match event { + SwarmEvent::Behaviour(event) => match event { + request_response::Event::Message { peer, message } => { + if let Message::Response { + request_id, + response, + } = message + { + tracing::info!("Received response with id {request_id} from {peer}"); + return Ok(response); + } + } + request_response::Event::OutboundFailure { + peer, + request_id, + error, + } => { + tracing::error!("Failed to send message with id {request_id} to {peer}: {error}"); + return Err(format!("Failed to send message with id {request_id} to {peer}: {error}")); } - if peer_id == peer_id_to_find { - return Ok(PeerInfo { - peer_id, - multiaddresses: registration.record.addresses().to_vec(), - }); + request_response::Event::InboundFailure { + peer, + request_id, + error, + } => { + tracing::error!("Failed to receive message with id {request_id} from {peer}: {error}"); + return Err(format!("Failed to receive message with id {request_id} from {peer}: {error}")); } + other => tracing::debug!("Unreachable event: {other:?}") + }, + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + tracing::info!("Connected to {}", peer_id); + swarm.behaviour_mut().send_request(bootstrap_id, resolve_id.into()); } - return Err(format!( - "No registered multi-addresses found for Peer ID {peer_id_to_find}" - ) - .into()); + other => tracing::debug!("Received other event: {other:?}"), } - - other => tracing::debug!("Other event: {other:?}"), } } } +#[derive(Parser)] +struct Cli { + /// Multiaddr of the bootstrap node. + #[arg(long)] + bootstrap_addr: Multiaddr, + /// PeerID of the bootstrap node. + #[arg(long)] + bootstrap_id: PeerId, + /// Peer ID to request the Multiaddrs for. + #[arg(long)] + resolve_id: PeerId, +} + #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), String> { let _ = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .try_init(); - let args = Cli::parse(); - - let mut swarm = create_swarm()?; - match discover( - &mut swarm, - args.peer_id, - args.rendezvous_point_address, - args.rendezvous_point, - ) - .await - { - Ok(peer_info) => { - println!("Found peer with Peer ID {}", args.peer_id); - println!("Peer Info:"); - println!("Peer ID: {}", peer_info.peer_id); - println!("Multiaddresses: {:?}", peer_info.multiaddresses); - } - Err(e) => eprintln!("{e}"), + let cli = Cli::parse(); + let swarm = create_discover_swarm()?; + println!("Attempting to get multiaddrs for peer {:?}", cli.resolve_id); + let peer_info = + run_discover(swarm, cli.bootstrap_addr, &cli.bootstrap_id, cli.resolve_id).await?; + match peer_info { + PeerInfoResponse::NotFound(peer) => println!("Peer {:?} is not registered", peer), + PeerInfoResponse::Found(info) => println!( + "Got multiaddrs {:#?} for peer {:?}", + info.multiaddrs, info.peer_id + ), } - Ok(()) } diff --git a/storage-provider/server/src/config.rs b/storage-provider/server/src/config.rs index 66e0c2603..c6c7f38a0 100644 --- a/storage-provider/server/src/config.rs +++ b/storage-provider/server/src/config.rs @@ -8,7 +8,7 @@ use clap::Args; use libp2p::{identity::Keypair, Multiaddr, PeerId}; use polka_storage_provider_common::config::sealing::SealingConfiguration; use primitives::{ - p2p::keypair_value_parser, + p2p::{keypair_value_parser, DEFAULT_REGISTRATION_TTL}, proofs::{RegisteredPoStProof, RegisteredSealProof}, }; use serde::Deserialize; @@ -19,8 +19,6 @@ use crate::{ DEFAULT_NODE_ADDRESS, }; -pub const DEFAULT_REGISTRATION_TTL: u64 = 86400; - /// Default address to bind the RPC server to. const fn default_rpc_listen_address() -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000) diff --git a/zombienet/local-david-collator.toml b/zombienet/local-david-collator.toml new file mode 100644 index 000000000..42c4cd278 --- /dev/null +++ b/zombienet/local-david-collator.toml @@ -0,0 +1,37 @@ +[relaychain] +chain = "rococo-local" +# That's because in Nix supplied `polkadot` binary, `SKIP_WASM_BUILD=1` is disabled. +# https://github.com/andresilva/polkadot.nix/blob/ac4e91a0c75cf422b80f06d6edcecb9bf63727e3/pkgs/polkadot-sdk/generic.nix#L84 +# Needed to generate chainspec by directly downloading polkadot binary (https://github.com/paritytech/polkadot-sdk/releases/download/polkadot-stable2412-1/polkadot) +# and running /polkadot build-spec --chain rococo-local > rococo-local.json`. +# We won't need to add `chain_spec_path` when https://github.com/paritytech/polkadot-sdk/pull/7008 is merged. +chain_spec_path = "./zombienet/rococo-local.json" +default_args = ["--detailed-log-output", "-lparachain=debug,xcm=trace,runtime=trace"] +default_command = "polkadot" + +[[relaychain.nodes]] +name = "erin" +validator = true + +[[parachains]] +cumulus_based = true + +# We need to use a Parachain of an existing System Chain (https://github.com/paritytech/polkadot-sdk/blob/master/polkadot/runtime/rococo/src/xcm_config.rs). +# The reason: being able to get native DOTs from Relay Chain to Parachain via XCM Teleport. +# We'll have a proper Parachain ID in the *future*, but for now, let's stick to 1000 (which is AssetHub and trusted). +id = 1000 + +# run david as parachain collator +[[parachains.collators]] +args = [ + "--bootstrap-addresses=/ip4/127.0.0.1/tcp/62649", + "--detailed-log-output", + "--p2p-key=@/tmp/zombienet/david-private.pem", + "--p2p-listen-address=/ip4/127.0.0.1/tcp/1337", + "--pool-type=fork-aware", + "-lparachain=debug,xcm=trace,runtime=trace,txpool=debug,basic-authorship=debug", +] +command = "target/release/polka-storage-node" +name = "david" +validator = true +ws_port = 42068 diff --git a/zombienet/local-kube-testnet.toml b/zombienet/local-kube-testnet.toml index 92c89e562..2edaff410 100644 --- a/zombienet/local-kube-testnet.toml +++ b/zombienet/local-kube-testnet.toml @@ -26,8 +26,9 @@ id = 1000 # run charlie as parachain collator [[parachains.collators]] args = [ + "--bootstrap-addresses=/ip4/127.0.0.1/tcp/1337", "--detailed-log-output", - "--p2p-key=@/tmp/zombienet/private.pem", + "--p2p-key=@/tmp/zombienet/charlie-private.pem", "--p2p-listen-address=/ip4/127.0.0.1/tcp/62649", "-lparachain=debug,xcm=trace,runtime=trace", ] diff --git a/zombienet/local-testnet.toml b/zombienet/local-testnet.toml index 60ea29229..c0e3a0137 100644 --- a/zombienet/local-testnet.toml +++ b/zombienet/local-testnet.toml @@ -28,8 +28,9 @@ id = 1000 # run charlie as parachain collator [[parachains.collators]] args = [ + "--bootstrap-addresses=/ip4/127.0.0.1/tcp/1337", "--detailed-log-output", - "--p2p-key=@/tmp/zombienet/private.pem", + "--p2p-key=@/tmp/zombienet/charlie-private.pem", "--p2p-listen-address=/ip4/127.0.0.1/tcp/62649", "--pool-type=fork-aware", "-lparachain=debug,xcm=trace,runtime=trace,txpool=debug,basic-authorship=debug",