diff --git a/Cargo.lock b/Cargo.lock index 8e92370dc..32220e0f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14094,6 +14094,7 @@ dependencies = [ "libp2p-swarm 0.45.1", "mater", "multihash-codetable", + "rand", "thiserror 2.0.8", "tokio", "tracing", diff --git a/storage-retrieval/cli/src/main.rs b/storage-retrieval/cli/src/main.rs index 03c474c13..1b76f5287 100644 --- a/storage-retrieval/cli/src/main.rs +++ b/storage-retrieval/cli/src/main.rs @@ -15,7 +15,7 @@ use tracing_subscriber::{ struct Cli { /// Provider used for data download #[arg(long)] - provider: Multiaddr, + provider: Vec, /// The output file to write to. #[arg(long)] @@ -46,7 +46,7 @@ async fn main() -> Result<(), anyhow::Error> { let arguments = Cli::parse(); let settings = ClientSettings::new(arguments.output, arguments.extract, arguments.overwrite); - let client = Client::new(vec![arguments.provider], arguments.payload_cid, settings).await?; + let client = Client::new(arguments.provider, arguments.payload_cid, settings).await?; let download_result = match arguments.timeout { Some(duration) => timeout(duration, client.download()).await, diff --git a/storage-retrieval/lib/Cargo.toml b/storage-retrieval/lib/Cargo.toml index 9abc9b2f7..5ff62fb06 100644 --- a/storage-retrieval/lib/Cargo.toml +++ b/storage-retrieval/lib/Cargo.toml @@ -28,5 +28,6 @@ tracing = { workspace = true } [dev-dependencies] multihash-codetable = { workspace = true, features = ["sha2"] } +rand = { workspace = true, default-features = true } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/storage-retrieval/lib/examples/chaos_server.rs b/storage-retrieval/lib/examples/chaos_server.rs new file mode 100644 index 000000000..815ee16d7 --- /dev/null +++ b/storage-retrieval/lib/examples/chaos_server.rs @@ -0,0 +1,133 @@ +//! The example showcases how to setup a retrieval server with the simple +//! blockstore. Because the server is simple it is used for manual testing of +//! the retrieval client. + +use std::{any::type_name, env::args, sync::Arc, time::Duration}; + +use anyhow::{Context, Result}; +use blockstore::Blockstore; +use libp2p::Multiaddr; +use mater::blockstore::ReadOnlyBlockstore; +use polka_storage_retrieval::server::Server; +use rand::prelude::*; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncSeek}, +}; + +const DEFAULT_PORT: u16 = 8989; + +/// Adds random delays on the `get` calls. +struct ChaosReadOnlyStore(ReadOnlyBlockstore); + +impl Blockstore for ChaosReadOnlyStore +where + R: AsyncRead + AsyncSeek + Unpin + blockstore::cond_send::CondSync, +{ + fn get( + &self, + cid: &cid::CidGeneric, + ) -> impl futures::Future>>> + + blockstore::cond_send::CondSend { + async { + if rand::thread_rng().gen_bool(0.5) { + let dur = Duration::from_millis(thread_rng().gen_range(250..=1000)); + tracing::info!("sleeping for {}", dur.as_millis()); + tokio::time::sleep(dur).await; + } + self.0.get(cid).await + } + } + + fn put_keyed( + &self, + _: &cid::CidGeneric, + _: &[u8], + ) -> impl futures::Future> + blockstore::cond_send::CondSend + { + async { + Err(blockstore::Error::FatalDatabaseError(format!( + "{} is read-only", + type_name::() + ))) + } + } + + fn remove( + &self, + _: &cid::CidGeneric, + ) -> impl futures::Future> + blockstore::cond_send::CondSend + { + async { + Err(blockstore::Error::FatalDatabaseError(format!( + "{} is read-only", + type_name::() + ))) + } + } + + fn close( + self, + ) -> impl futures::Future> + blockstore::cond_send::CondSend + { + async { Ok(()) } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + // Init tracing + let _guard = init_tracing(); + + // Avoiding importing clap + let args = args().collect::>(); + let port = match args.get(1).map(|port| port.trim().parse::()) { + Some(Ok(port)) => port, + Some(Err(err)) => return Err(err).context("failed to parse server port"), + None => DEFAULT_PORT, + }; + + let file = match args.get(2) { + Some(path) => File::open(path).await?, + // If there isn't a port, the argument will be at the first position + None => match args.get(1) { + Some(path) => File::open(path).await?, + None => File::open("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car").await?, + }, + }; + + // Example blockstore providing only a single file. + let blockstore = Arc::new(ChaosReadOnlyStore(ReadOnlyBlockstore::new(file).await?)); + + let roots = blockstore.write().await.roots().await?; + tracing::info!("available roots: {:?}", roots); + + // Setup & run the server + let server = Server::new(blockstore)?; + let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port).parse()?; + tracing::info!(multiaddress = %listener); + + server.run(vec![listener], std::future::pending()).await?; + + Ok(()) +} + +fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard { + let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); + + let filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + .from_env_lossy(); + + tracing_subscriber::fmt() + .event_format( + tracing_subscriber::fmt::format() + .with_file(true) + .with_line_number(true), + ) + .with_env_filter(filter) + .with_writer(non_blocking) + .init(); + + guard +} diff --git a/storage-retrieval/lib/examples/simple_server.rs b/storage-retrieval/lib/examples/simple_server.rs index 649152cf0..932132be7 100644 --- a/storage-retrieval/lib/examples/simple_server.rs +++ b/storage-retrieval/lib/examples/simple_server.rs @@ -2,30 +2,47 @@ //! blockstore. Because the server is simple it is used for manual testing of //! the retrieval client. -use std::sync::Arc; +use std::{env::args, sync::Arc}; -use anyhow::Result; +use anyhow::{Context, Result}; use libp2p::Multiaddr; use mater::blockstore::ReadOnlyBlockstore; use polka_storage_retrieval::server::Server; +use tokio::fs::File; + +const DEFAULT_PORT: u16 = 8989; #[tokio::main] async fn main() -> Result<()> { // Init tracing let _guard = init_tracing(); + // Avoiding importing clap + let args = args().collect::>(); + let port = match args.get(1).map(|port| port.trim().parse::()) { + Some(Ok(port)) => port, + Some(Err(err)) => return Err(err).context("failed to parse server port"), + None => DEFAULT_PORT, + }; + + let file = match args.get(2) { + Some(path) => File::open(path).await?, + // If there isn't a port, the argument will be at the first position + None => match args.get(1) { + Some(path) => File::open(path).await?, + None => File::open("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car").await?, + }, + }; + // Example blockstore providing only a single file. - let blockstore = Arc::new( - ReadOnlyBlockstore::from_path("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car") - .await?, - ); + let blockstore = Arc::new((ReadOnlyBlockstore::new(file).await?)); let roots = blockstore.write().await.roots().await?; tracing::info!("available roots: {:?}", roots); // Setup & run the server let server = Server::new(blockstore)?; - let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?; + let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port).parse()?; tracing::info!(multiaddress = %listener); server.run(vec![listener], std::future::pending()).await?; diff --git a/storage-retrieval/lib/src/client.rs b/storage-retrieval/lib/src/client.rs index cc99daae8..0abc10923 100644 --- a/storage-retrieval/lib/src/client.rs +++ b/storage-retrieval/lib/src/client.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + sync::Arc, +}; use beetswap::{Event, QueryId}; use blockstore::Blockstore; @@ -74,8 +78,8 @@ pub struct Client { blockstore: ReadWriteBlockstore, /// Content roots being downloaded. root: Cid, - - structure: HashMap, + /// CAR block DAG mapping children to parents. (The A in DAG isn't checked!) + dag: HashMap, } impl Client { @@ -112,7 +116,7 @@ impl Client { queries: HashMap::new(), blockstore, root, - structure: HashMap::new(), + dag: HashMap::new(), }) } @@ -152,12 +156,11 @@ impl Client { let inner = self.blockstore.into_inner(); let mut file = inner .finish_with_roots( - self.structure + self.dag .values() .copied() - // We're expecting a single one, so this should be ok - // if any issues arise, we can use an HashSet - .filter(|cid| self.structure.contains_key(cid)), + .filter(|cid| !self.dag.contains_key(cid)) + .collect::>(), ) .await?; @@ -251,7 +254,7 @@ impl Client { node.links.iter().map(|link| link.cid).for_each(|l_cid| { tracing::debug!("inserting {}: {}", l_cid, cid); - self.structure.insert(l_cid, cid); + self.dag.insert(l_cid, cid); self.request_block(l_cid); }); }