From 74aacb47bac202dcfee897a6b5016b07157c8c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= Date: Mon, 24 Feb 2025 12:50:49 +0000 Subject: [PATCH] feat(mater): add mater::ReadOnlyBlockstore (#774) --- mater/lib/src/cid.rs | 4 +- mater/lib/src/file_reader.rs | 255 ++++++++++++++++-- mater/lib/src/lib.rs | 12 +- mater/lib/src/stores/file.rs | 18 +- mater/lib/src/stores/mod.rs | 21 ++ mater/lib/src/v1/mod.rs | 2 +- storage-retrieval/cli/src/main.rs | 2 +- .../lib/examples/simple_server.rs | 9 +- 8 files changed, 280 insertions(+), 43 deletions(-) diff --git a/mater/lib/src/cid.rs b/mater/lib/src/cid.rs index 6c11e6891..7b0f82d9b 100644 --- a/mater/lib/src/cid.rs +++ b/mater/lib/src/cid.rs @@ -30,7 +30,7 @@ impl CidExt for CidGeneric { } /// Async implementation of - /// https://github.com/multiformats/rust-cid/blob/eb03f566e9bfb19bad79b2691dbcb2541627c0b3/src/cid.rs#L143C12-L143C22 + /// async fn read_bytes_async(mut r: R) -> Result<(Self, usize), Error> where R: AsyncRead + Unpin, @@ -63,7 +63,7 @@ impl CidExt for CidGeneric { impl MultihashExt for Multihash { /// Async implementation of - /// https://github.com/multiformats/rust-multihash/blob/90a6c19ec71ced09469eec164a3586aafeddfbbd/src/multihash.rs#L271 + /// async fn read_async(mut r: R) -> Result<(Self, usize), Error> where R: AsyncRead + Unpin, diff --git a/mater/lib/src/file_reader.rs b/mater/lib/src/file_reader.rs index 97898a62d..07f50b19b 100644 --- a/mater/lib/src/file_reader.rs +++ b/mater/lib/src/file_reader.rs @@ -17,39 +17,40 @@ use crate::{multicodec, v1::BlockMetadata, v2, Error}; /// Extracts the raw data from a CARv2 file. /// It expects the CAR file to have only 1 root. -pub struct CarExtractor -where - R: AsyncRead + AsyncSeek + Unpin, -{ +pub struct CarExtractor { reader: v2::Reader, index: HashMap, } +impl CarExtractor { + /// Creates a new [`CarExtractor`] from the given reader. + pub async fn new(reader: R) -> Result + where + R: AsyncRead + AsyncSeek + Unpin, + { + let mut self_ = Self { + reader: v2::Reader::new(reader), + index: HashMap::with_capacity(1), + }; + self_.naive_build_index().await?; + Ok(self_) + } +} + impl CarExtractor { /// Creates a [`CarExtractor`] from the given file path. pub async fn from_path

(path: P) -> Result where P: AsRef, { - let file = File::open(path).await?; - let mut loader = Self { - reader: v2::Reader::new(file), - index: HashMap::new(), - }; - loader.naive_build_index().await?; - Ok(loader) + Self::new(File::open(path).await?).await } } impl CarExtractor>> { /// Creates a [`CarExtractor`] from a vector of bytes. pub async fn from_vec(vec: Vec) -> Result { - let mut loader = Self { - reader: v2::Reader::new(Cursor::new(vec)), - index: HashMap::new(), - }; - loader.naive_build_index().await?; - Ok(loader) + Self::new(Cursor::new(vec)).await } } @@ -170,11 +171,227 @@ where } } +#[cfg(feature = "blockstore")] +pub(crate) mod blockstore { + use std::{any::type_name, ops::Deref, path::Path}; + + use blockstore::Blockstore; + use futures::TryFutureExt; + use ipld_core::cid::Cid; + use tokio::{ + fs::File, + io::{AsyncRead, AsyncSeek, AsyncSeekExt}, + sync::RwLock, + }; + + use crate::{stores::to_blockstore_cid, CarExtractor, CidExt, Error}; + + // Methods in here are marked as unused in the "main" `impl` because they're only used here. + impl CarExtractor + where + R: AsyncRead + AsyncSeek + Unpin, + { + fn has(&self, cid: &Cid) -> bool { + if cid.get_identity_data().is_some() { + return true; + } + // Since we're using the naive index, if the Cid isn't in the index, there is no Cid inside + self.index.contains_key(cid) + } + + async fn get(&mut self, cid: &Cid) -> Result>, Error> { + if let Some(identity_data) = cid.get_identity_data() { + return Ok(Some(identity_data.to_vec())); + } + + match self.index.get(&cid) { + Some(metadata) => { + // We could seek directly to the data and not read the Cid, but this is "canonical" + self.reader + .get_inner_mut() + .seek(std::io::SeekFrom::Start(metadata.block_offset)) + .await?; + let (_, block) = self.reader.read_block().await?; + Ok(Some(block)) + } + None => Ok(None), + } + } + } + + /// A read-only [`blockstore::Blockstore`] implementation of [`CarExtractor`]. + pub struct ReadOnlyBlockstore { + inner: RwLock>, + } + + impl ReadOnlyBlockstore + where + R: AsyncRead + AsyncSeek + Unpin + blockstore::cond_send::CondSync, + { + /// Create a new [`CarReadOnlyBlockstore`] from the given reader. + pub async fn new(reader: R) -> Result { + Ok(Self { + inner: RwLock::new(CarExtractor::new(reader).await?), + }) + } + } + + impl ReadOnlyBlockstore { + /// Create a new [`CarReadOnlyBlockstore`](CarReadOnlyBlockstore) from the given path. + pub async fn from_path

(path: P) -> Result + where + P: AsRef, + { + Self::new(File::open(path).await?).await + } + } + + impl Deref for ReadOnlyBlockstore { + type Target = RwLock>; + + fn deref(&self) -> &Self::Target { + &self.inner + } + } + + impl Blockstore for ReadOnlyBlockstore + where + R: AsyncRead + AsyncSeek + Unpin + Send + Sync, + { + async fn get( + &self, + cid: &ipld_core::cid::CidGeneric, + ) -> blockstore::Result>> { + let cid = to_blockstore_cid(cid)?; + self.inner + .write() + .await + .get(&cid) + .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) + .await + } + + async fn has( + &self, + cid: &ipld_core::cid::CidGeneric, + ) -> blockstore::Result { + let cid = to_blockstore_cid(cid)?; + Ok(self.inner.read().await.has(&cid)) + } + + async fn put_keyed( + &self, + _: &ipld_core::cid::CidGeneric, + _: &[u8], + ) -> blockstore::Result<()> { + Err(blockstore::Error::FatalDatabaseError(format!( + "{} is read-only", + type_name::() + ))) + } + + async fn remove( + &self, + _: &ipld_core::cid::CidGeneric, + ) -> blockstore::Result<()> { + Err(blockstore::Error::FatalDatabaseError(format!( + "{} is read-only", + type_name::() + ))) + } + + async fn close(self) -> blockstore::Result<()> { + Ok(()) + } + } + + #[cfg(test)] + mod test { + use std::{str::FromStr, sync::Arc}; + + use ipld_core::cid::{multihash::Multihash, Cid}; + use sha2::Sha256; + use tokio::fs::File; + + use super::*; + use crate::{ + multicodec::generate_multihash, test_utils::assert_buffer_eq, IDENTITY_CODE, RAW_CODE, + }; + + type FileBlockstore = ReadOnlyBlockstore; + + #[tokio::test] + async fn test_identity_cid() { + let blockstore = FileBlockstore::from_path("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + + let payload = b"Hello World!"; + let multihash = Multihash::wrap(IDENTITY_CODE, payload).unwrap(); + let identity_cid = Cid::new_v1(RAW_CODE, multihash); + + let has_block = blockstore.has(&identity_cid).await.unwrap(); + assert!(has_block); + + let content = blockstore.get(&identity_cid).await.unwrap().unwrap(); + assert_buffer_eq!(&payload, &content); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 3)] + async fn test_parallel_readers() { + let blockstore = FileBlockstore::from_path("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + let blockstore = Arc::new(blockstore); + + // CIDs of the content blocks that the spaceglenda.car contains. We are + // only looking at the raw content so that our validation is easier later. + let cids = vec![ + Cid::from_str("bafkreic6kcrue6ms42ykrisq6or24pbrubnyouvmgvk7ft73fjd4ynslxi") + .unwrap(), + Cid::from_str("bafkreicvuc5rwwjqzix7saaia55du44qqsnphdugvjxlbe446mjmupekl4") + .unwrap(), + Cid::from_str("bafkreiepxrkqexuff4vhc4vp6co73ubbp2vmskbwwazaihln6wws2z4wly") + .unwrap(), + ]; + + // Request many blocks + let handles = (0..100) + .into_iter() + .map(|i| { + let requested = cids[i % cids.len()]; + tokio::spawn({ + let blockstore = Arc::clone(&blockstore); + async move { + ( + requested, + blockstore.get(&requested).await.unwrap().unwrap(), + ) + } + }) + }) + .collect::>(); + + // Validate if the blocks received are correct + for handle in handles { + let (requested_cid, block_bytes) = handle.await.expect("Panic in task"); + + // Generate the CID form the bytes. That way we can check if the + // block data returned is correct. + let multihash = generate_multihash::(&block_bytes); + let generated_cid = Cid::new_v1(RAW_CODE, multihash); + + assert_eq!(requested_cid, generated_cid); + } + } + } +} + #[cfg(test)] mod test { use std::{io::Cursor, path::Path}; - use crate::CarExtractor; + use crate::{test_utils::assert_buffer_eq, CarExtractor}; #[tokio::test] async fn read_duplicated_blocks() { @@ -189,7 +406,7 @@ mod test { let inner = out_check.into_inner(); let result = inner.as_slice(); - assert_eq!(expected, result); + assert_buffer_eq!(expected, result); } async fn load_and_compare(original: P1, path: P2) diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 37776f04a..7d9950673 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -1,8 +1,5 @@ //! A library to handle CAR files. //! Both version 1 and version 2 are supported. -//! -//! You can make use of the lower-level utilities such as [`CarV2Reader`] to read a CARv2 file, -//! though these utilities were designed to be used in higher-level abstractions, like the [`Blockstore`]. #![warn(unused_crate_dependencies)] #![warn(missing_docs)] @@ -33,6 +30,15 @@ pub use v2::{ MultihashIndexSorted, Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer, }; +/// [`blockstore`] abstractions over CAR files. +#[cfg(feature = "blockstore")] +pub mod blockstore { + // Re-export the API so users don't need to add an extra crate in Cargo.toml + pub use blockstore::{Blockstore, Error}; + + pub use crate::file_reader::blockstore::ReadOnlyBlockstore; +} + /// CAR handling errors. #[derive(Debug, thiserror::Error)] pub enum Error { diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 804b18c97..71b802c70 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -267,10 +267,10 @@ impl FileBlockstore { #[cfg(feature = "blockstore")] mod blockstore { - use blockstore::{block::CidError, Blockstore, Error}; - use ipld_core::cid::{Cid, CidGeneric}; + use blockstore::{Blockstore, Error}; + use ipld_core::cid::CidGeneric; - use crate::FileBlockstore; + use crate::{stores::to_blockstore_cid, FileBlockstore}; impl Blockstore for FileBlockstore { async fn get(&self, cid: &CidGeneric) -> Result>, Error> { @@ -313,18 +313,6 @@ mod blockstore { .map_err(|err| Error::FatalDatabaseError(err.to_string())) } } - - /// Convert CID with the generic Multihash size to the CID with the specific - /// Multihash size that the underlying blockstore expects. - fn to_blockstore_cid(cid: &CidGeneric) -> Result { - let digest_size = cid.hash().size() as usize; - let hash = cid - .hash() - .resize::<64>() - .map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?; - - Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here")) - } } #[cfg(test)] diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 970cb75c7..389eae30d 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -43,3 +43,24 @@ impl Default for Config { } } } + +#[cfg(feature = "blockstore")] +mod _blockstore { + use blockstore::{block::CidError, Error}; + use ipld_core::cid::{Cid, CidGeneric}; + + /// Convert CID with the generic Multihash size to the CID with the specific + /// Multihash size that the underlying blockstore expects. + pub fn to_blockstore_cid(cid: &CidGeneric) -> Result { + let digest_size = cid.hash().size() as usize; + let hash = cid + .hash() + .resize::<64>() + .map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?; + + Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here")) + } +} + +#[cfg(feature = "blockstore")] +pub(crate) use _blockstore::to_blockstore_cid; diff --git a/mater/lib/src/v1/mod.rs b/mater/lib/src/v1/mod.rs index c949d8716..63d0f93df 100644 --- a/mater/lib/src/v1/mod.rs +++ b/mater/lib/src/v1/mod.rs @@ -81,7 +81,7 @@ impl Header { } /// Returns the encoded length of the header, including the VarInt size prefix. - /// The size of the [`Header`] when encoded using [`DagCborCodec`]. + /// The size of the [`Header`] when encoded using [`DagCborCodec`](serde_ipld_dagcbor::codec::DagCborCodec). /// /// The formula is: `overhead + 41 * roots.len()`. /// It is based on reversing the CBOR encoding, see an example: diff --git a/storage-retrieval/cli/src/main.rs b/storage-retrieval/cli/src/main.rs index 185fbb7b1..6f9e38e26 100644 --- a/storage-retrieval/cli/src/main.rs +++ b/storage-retrieval/cli/src/main.rs @@ -71,7 +71,7 @@ fn setup_tracing() -> Result<(), FromEnvError> { .with_default_directive(if cfg!(debug_assertions) { LevelFilter::DEBUG.into() } else { - LevelFilter::WARN.into() + LevelFilter::INFO.into() }) .from_env()?, ), diff --git a/storage-retrieval/lib/examples/simple_server.rs b/storage-retrieval/lib/examples/simple_server.rs index bcec600f4..649152cf0 100644 --- a/storage-retrieval/lib/examples/simple_server.rs +++ b/storage-retrieval/lib/examples/simple_server.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use anyhow::Result; use libp2p::Multiaddr; -use mater::FileBlockstore; +use mater::blockstore::ReadOnlyBlockstore; use polka_storage_retrieval::server::Server; #[tokio::main] @@ -16,13 +16,18 @@ async fn main() -> Result<()> { // Example blockstore providing only a single file. let blockstore = Arc::new( - FileBlockstore::from_existing("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car") + ReadOnlyBlockstore::from_path("./mater/lib/tests/fixtures/car_v2/spaceglenda_wrapped.car") .await?, ); + let roots = blockstore.write().await.roots().await?; + tracing::info!("available roots: {:?}", roots); + // Setup & run the server let server = Server::new(blockstore)?; let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?; + tracing::info!(multiaddress = %listener); + server.run(vec![listener], std::future::pending()).await?; Ok(())