Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fetch): add support for multiple providers #784

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions storage-retrieval/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing_subscriber::{
struct Cli {
/// Provider used for data download
#[arg(long)]
provider: Multiaddr,
provider: Vec<Multiaddr>,

/// The output file to write to.
#[arg(long)]
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn main() -> Result<(), anyhow::Error> {
let arguments = Cli::parse();

let settings = ClientSettings::new(arguments.output, arguments.extract, arguments.overwrite);
let client = Client::new(vec![arguments.provider], arguments.payload_cid, settings).await?;
let client = Client::new(arguments.provider, arguments.payload_cid, settings).await?;

let download_result = match arguments.timeout {
Some(duration) => timeout(duration, client.download()).await,
Expand Down
1 change: 1 addition & 0 deletions storage-retrieval/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ tracing = { workspace = true }

[dev-dependencies]
multihash-codetable = { workspace = true, features = ["sha2"] }
rand = { workspace = true, default-features = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
133 changes: 133 additions & 0 deletions storage-retrieval/lib/examples/chaos_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! The example showcases how to setup a retrieval server with the simple
//! blockstore. Because the server is simple it is used for manual testing of
//! the retrieval client.

use std::{any::type_name, env::args, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use blockstore::Blockstore;
use libp2p::Multiaddr;
use mater::blockstore::ReadOnlyBlockstore;
use polka_storage_retrieval::server::Server;
use rand::prelude::*;
use tokio::{
fs::File,
io::{AsyncRead, AsyncSeek},
};

const DEFAULT_PORT: u16 = 8989;

/// Adds random delays on the `get` calls.
struct ChaosReadOnlyStore<R>(ReadOnlyBlockstore<R>);

impl<R> Blockstore for ChaosReadOnlyStore<R>
where
R: AsyncRead + AsyncSeek + Unpin + blockstore::cond_send::CondSync,
{
fn get<const S: usize>(
&self,
cid: &cid::CidGeneric<S>,
) -> impl futures::Future<Output = blockstore::Result<Option<Vec<u8>>>>
+ blockstore::cond_send::CondSend {
async {
if rand::thread_rng().gen_bool(0.5) {
let dur = Duration::from_millis(thread_rng().gen_range(250..=1000));
tracing::info!("sleeping for {}", dur.as_millis());
tokio::time::sleep(dur).await;
}
self.0.get(cid).await
}
}

fn put_keyed<const S: usize>(
&self,
_: &cid::CidGeneric<S>,
_: &[u8],
) -> impl futures::Future<Output = blockstore::Result<()>> + blockstore::cond_send::CondSend
{
async {
Err(blockstore::Error::FatalDatabaseError(format!(
"{} is read-only",
type_name::<Self>()
)))
}
}

fn remove<const S: usize>(
&self,
_: &cid::CidGeneric<S>,
) -> impl futures::Future<Output = blockstore::Result<()>> + blockstore::cond_send::CondSend
{
async {
Err(blockstore::Error::FatalDatabaseError(format!(
"{} is read-only",
type_name::<Self>()
)))
}
}

fn close(
self,
) -> impl futures::Future<Output = blockstore::Result<()>> + blockstore::cond_send::CondSend
{
async { Ok(()) }
}
}

#[tokio::main]
async fn main() -> Result<()> {
// Init tracing
let _guard = init_tracing();

// Avoiding importing clap
let args = args().collect::<Vec<_>>();
let port = match args.get(1).map(|port| port.trim().parse::<u16>()) {
Some(Ok(port)) => port,
Some(Err(err)) => return Err(err).context("failed to parse server port"),
None => DEFAULT_PORT,
};

let file = match args.get(2) {
Some(path) => File::open(path).await?,
// If there isn't a port, the argument will be at the first position
None => match args.get(1) {
Some(path) => File::open(path).await?,
None => File::open("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car").await?,
},
};

// Example blockstore providing only a single file.
let blockstore = Arc::new(ChaosReadOnlyStore(ReadOnlyBlockstore::new(file).await?));

let roots = blockstore.write().await.roots().await?;
tracing::info!("available roots: {:?}", roots);

// Setup & run the server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port).parse()?;
tracing::info!(multiaddress = %listener);

server.run(vec![listener], std::future::pending()).await?;

Ok(())
}

fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true),
)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();

guard
}
31 changes: 24 additions & 7 deletions storage-retrieval/lib/examples/simple_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,47 @@
//! blockstore. Because the server is simple it is used for manual testing of
//! the retrieval client.

use std::sync::Arc;
use std::{env::args, sync::Arc};

use anyhow::Result;
use anyhow::{Context, Result};
use libp2p::Multiaddr;
use mater::blockstore::ReadOnlyBlockstore;
use polka_storage_retrieval::server::Server;
use tokio::fs::File;

const DEFAULT_PORT: u16 = 8989;

#[tokio::main]
async fn main() -> Result<()> {
// Init tracing
let _guard = init_tracing();

// Avoiding importing clap
let args = args().collect::<Vec<_>>();
let port = match args.get(1).map(|port| port.trim().parse::<u16>()) {
Some(Ok(port)) => port,
Some(Err(err)) => return Err(err).context("failed to parse server port"),
None => DEFAULT_PORT,
};

let file = match args.get(2) {
Some(path) => File::open(path).await?,
// If there isn't a port, the argument will be at the first position
None => match args.get(1) {
Some(path) => File::open(path).await?,
None => File::open("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car").await?,
},
};

// Example blockstore providing only a single file.
let blockstore = Arc::new(
ReadOnlyBlockstore::from_path("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car")
.await?,
);
let blockstore = Arc::new((ReadOnlyBlockstore::new(file).await?));

let roots = blockstore.write().await.roots().await?;
tracing::info!("available roots: {:?}", roots);

// Setup & run the server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port).parse()?;
tracing::info!(multiaddress = %listener);

server.run(vec![listener], std::future::pending()).await?;
Expand Down
21 changes: 12 additions & 9 deletions storage-retrieval/lib/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::Arc,
};

use beetswap::{Event, QueryId};
use blockstore::Blockstore;
Expand Down Expand Up @@ -74,8 +78,8 @@ pub struct Client {
blockstore: ReadWriteBlockstore<File>,
/// Content roots being downloaded.
root: Cid,

structure: HashMap<Cid, Cid>,
/// CAR block DAG mapping children to parents. (The A in DAG isn't checked!)
dag: HashMap<Cid, Cid>,
}

impl Client {
Expand Down Expand Up @@ -112,7 +116,7 @@ impl Client {
queries: HashMap::new(),
blockstore,
root,
structure: HashMap::new(),
dag: HashMap::new(),
})
}

Expand Down Expand Up @@ -152,12 +156,11 @@ impl Client {
let inner = self.blockstore.into_inner();
let mut file = inner
.finish_with_roots(
self.structure
self.dag
.values()
.copied()
// We're expecting a single one, so this should be ok
// if any issues arise, we can use an HashSet
.filter(|cid| self.structure.contains_key(cid)),
.filter(|cid| !self.dag.contains_key(cid))
.collect::<HashSet<_>>(),
)
.await?;

Expand Down Expand Up @@ -251,7 +254,7 @@ impl Client {

node.links.iter().map(|link| link.cid).for_each(|l_cid| {
tracing::debug!("inserting {}: {}", l_cid, cid);
self.structure.insert(l_cid, cid);
self.dag.insert(l_cid, cid);
self.request_block(l_cid);
});
}
Expand Down