diff --git a/Cargo.lock b/Cargo.lock index 24603ff2c..a05f2678c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4842,7 +4842,7 @@ checksum = "3a82608ee96ce76aeab659e9b8d3c2b787bffd223199af88c674923d861ada10" dependencies = [ "execute-command-macro", "execute-command-tokens", - "generic-array 1.1.1", + "generic-array 1.1.0", ] [[package]] @@ -6084,9 +6084,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "1.1.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb8bc4c28d15ade99c7e90b219f30da4be5c88e586277e8cbe886beeb868ab2" +checksum = "96512db27971c2c3eece70a1e106fbe6c87760234e31e8f7e5634912fe52794a" dependencies = [ "typenum", ] @@ -9100,6 +9100,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tracing", ] [[package]] @@ -12084,7 +12085,7 @@ dependencies = [ "frame-benchmarking 28.0.0", "frame-support 28.0.0", "frame-system 28.0.0", - "generic-array 1.1.1", + "generic-array 1.1.0", "hex", "log", "num-bigint", @@ -13920,9 +13921,11 @@ dependencies = [ name = "polka-storage-provider-server" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.22.1", + "bytes", "chrono", "ciborium", "cid 0.11.1", diff --git a/mater/cli/src/convert.rs b/mater/cli/src/convert.rs index 67c3a64fd..fb69cf6f9 100644 --- a/mater/cli/src/convert.rs +++ b/mater/cli/src/convert.rs @@ -9,6 +9,7 @@ use crate::error::Error; pub(crate) async fn convert_file_to_car( input_path: &PathBuf, output_path: &PathBuf, + config: Config, overwrite: bool, ) -> Result { let source_file = File::open(input_path).await?; @@ -17,7 +18,7 @@ pub(crate) async fn convert_file_to_car( } else { File::create_new(output_path).await }?; - let cid = create_filestore(source_file, output_file, Config::default()).await?; + let cid = create_filestore(source_file, output_file, config).await?; Ok(cid) } @@ -29,14 +30,12 @@ mod tests { use std::str::FromStr; use anyhow::Result; - use mater::Cid; + use mater::{Cid, Config, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; use tempfile::tempdir; use tokio::{fs::File, io::AsyncWriteExt}; - use crate::{convert::convert_file_to_car, error::Error}; - #[tokio::test] - async fn convert_file_to_car_success() -> Result<()> { + async fn convert_file_to_car_raw_success() -> Result<()> { // Setup: Create a dummy input file let temp_dir = tempdir()?; let input_path = temp_dir.path().join("test_input.txt"); @@ -49,8 +48,11 @@ mod tests { // Define output path let output_path = temp_dir.path().join("test_output.car"); + // Configure in raw mode + let config = Config::balanced_raw(DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH); + // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let result = super::convert_file_to_car(&input_path, &output_path, config, false).await; // Assert the result is Ok assert!(result.is_ok()); @@ -73,15 +75,11 @@ mod tests { // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; - - // Assert the result is an error - assert!(result.is_err()); - assert!(matches!(result, Err(Error::IoError(..)))); + let config = Config::default(); - // Close temporary directory - temp_dir.close()?; + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config, false).await; + assert!(matches!(result, Err(super::Error::IoError(..)))); Ok(()) } @@ -94,21 +92,23 @@ mod tests { let mut input_file = File::create(&input_path).await?; tokio::io::AsyncWriteExt::write_all(&mut input_file, b"test data").await?; - // Create output file + // Create output file so that the file already exists. + // Since we are not allowing overwrites (overwrite = false), this should trigger an error. let output_path = temp_dir.path().join("output_file"); File::create_new(&output_path).await?; println!("gets here"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + // Provide a configuration (using default in this example). + let config = Config::default(); - // Assert the result is an error + // Call the function under test with the config and overwrite flag set to false. + let result = super::convert_file_to_car(&input_path, &output_path, config, false).await; + + // Assert the result is an error, specifically an IoError. assert!(result.is_err()); - assert!(matches!(result, Err(Error::IoError(..)))); + assert!(matches!(result, Err(super::Error::IoError(..)))); - // Close temporary directory temp_dir.close()?; - Ok(()) } } diff --git a/mater/cli/src/main.rs b/mater/cli/src/main.rs index ec00f6068..2baf89e16 100644 --- a/mater/cli/src/main.rs +++ b/mater/cli/src/main.rs @@ -1,9 +1,9 @@ use std::path::PathBuf; use clap::Parser; +use mater::{Config, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{convert::convert_file_to_car, error::Error, extract::extract_file_from_car}; - mod convert; mod error; mod extract; @@ -26,14 +26,24 @@ enum MaterCli { #[arg(short, long, action)] quiet: bool, - /// If enabled, the output will overwrite any existing files. + /// If enabled, content will be stored directly without UnixFS wrapping. #[arg(long, action)] - overwrite: bool, + raw: bool, + + /// Size of each chunk in bytes. + #[arg(long, default_value_t = DEFAULT_CHUNK_SIZE)] + chunk_size: usize, + + /// Maximum number of children per parent node. + #[arg(long, default_value_t = DEFAULT_TREE_WIDTH)] + tree_width: usize, }, + /// Convert a CARv2 file to its original format Extract { /// Path to CARv2 file input_path: PathBuf, + /// Path to output file output_path: Option, }, @@ -46,14 +56,20 @@ async fn main() -> Result<(), Error> { input_path, output_path, quiet, - overwrite, + raw, + chunk_size, + tree_width, } => { let output_path = output_path.unwrap_or_else(|| { let mut new_path = input_path.clone(); new_path.set_extension("car"); new_path }); - let cid = convert_file_to_car(&input_path, &output_path, overwrite).await?; + + // Build config with UnixFS wrapping by default + let config = Config::balanced(chunk_size, tree_width, raw); + + let cid = convert_file_to_car(&input_path, &output_path, config, false).await?; if quiet { println!("{}", cid); @@ -75,14 +91,12 @@ async fn main() -> Result<(), Error> { new_path }); extract_file_from_car(&input_path, &output_path).await?; - println!( - "Successfully converted CARv2 file {} and saved it to to {}", + "Successfully converted CARv2 file {} and saved it to {}", input_path.display(), output_path.display() ); } } - Ok(()) } diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 0191772cf..edf04e20a 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -29,6 +29,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["io"] } +tracing = { workspace = true } # Optional dependencies blockstore = { workspace = true, optional = true } diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 739d237c1..ba28df75f 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -18,16 +18,138 @@ mod unixfs; mod v1; mod v2; -// We need to re-expose this because `read_block` returns `(Cid, Vec)`. +use std::{ + collections::{HashMap, HashSet}, + io::SeekFrom, +}; + pub use ipld_core::cid::Cid; +use ipld_core::codec::Codec; +use ipld_dagpb::DagPbCodec; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; +pub use stores::{ + create_filestore, Blockstore, Config, FileBlockstore, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH, +}; +use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, MultihashIndexSorted, Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer, }; +/// Represents the location and size of a block in the CAR file. +pub struct BlockLocation { + /// The byte offset in the CAR file where the block starts. + pub offset: u64, + /// The size (in bytes) of the block. + pub size: u64, +} + +/// A simple blockstore backed by a CAR file and its index. +pub struct CarBlockStore { + reader: R, + /// Mapping from CID to block location. + pub index: HashMap, +} + +impl CarBlockStore +where + R: AsyncSeekExt + AsyncReadExt + Unpin, +{ + /// Extract content by traversing the UnixFS DAG using the index. + pub async fn extract_content_via_index( + &mut self, + root: &Cid, + output: &mut W, + ) -> Result<(), Error> + where + W: AsyncWriteExt + Unpin, + { + // To avoid processing a block more than once. + let mut processed = HashSet::new(); + // We use a stack for DFS traversal. + let mut to_process = vec![*root]; + + while let Some(current_cid) = to_process.pop() { + if processed.contains(¤t_cid) { + continue; + } + processed.insert(current_cid); + + // Retrieve block by CID via the index. + let block_bytes = self.get_block(¤t_cid).await?; + + // Write the raw block data. In a real UnixFS traversal you might need + // to reconstruct file content in order. + output.write_all(&block_bytes).await?; + + // If the block is a DAG-PB node, decode and enqueue its children. + if current_cid.codec() == crate::multicodec::DAG_PB_CODE { + let mut cursor = std::io::Cursor::new(&block_bytes); + // Propagate any error that occurs during decoding. + let pb_node: ipld_dagpb::PbNode = + DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?; + for link in pb_node.links { + if !processed.contains(&link.cid) { + to_process.push(link.cid); + } + } + } + } + + Ok(()) + } +} + +impl CarBlockStore +where + R: AsyncSeek + AsyncReadExt + Unpin, +{ + /// Given a reader positioned at the start of a CAR file, + /// load the CARv2 index and build a mapping of CID -> (offset, size). + /// For simplicity, assume the CAR header has been read and the index offset is known. + pub async fn load_index( + mut reader: R, + index_offset: u64, + ) -> Result, Error> { + // Seek to the start of the index. + reader.seek(SeekFrom::Start(index_offset)).await?; + // Parse the index according to the CARv2 spec. For demonstration, + // we assume a very simple format where each index entry is: + // [CID length (u8)][CID bytes][offset (u64)][size (u64)] + let mut index = HashMap::new(); + // In a real implementation you’d read until EOF or index length. + // Here we use a simple loop: + loop { + let cid_len = match reader.read_u8().await { + Ok(n) => n as usize, + Err(_) => break, + }; + let mut cid_buf = vec![0u8; cid_len]; + reader.read_exact(&mut cid_buf).await?; + let cid = Cid::try_from(cid_buf).map_err(|e| Error::Other(e.to_string()))?; + + let offset = reader.read_u64_le().await?; + let size = reader.read_u64_le().await?; + index.insert(cid, BlockLocation { offset, size }); + } + Ok(index) + } + + /// Retrieve a block by its CID. This method uses the in-memory index + /// to seek directly to the block’s location. + pub async fn get_block(&mut self, cid: &Cid) -> Result, Error> { + if let Some(location) = self.index.get(cid) { + self.reader.seek(SeekFrom::Start(location.offset)).await?; + let mut buf = vec![0u8; location.size as usize]; + self.reader.read_exact(&mut buf).await?; + Ok(buf) + } else { + Err(Error::BlockNotFound(cid.to_string())) + } + } +} + /// CAR handling errors. #[derive(Debug, thiserror::Error)] pub enum Error { @@ -111,6 +233,14 @@ pub enum Error { /// See [`DagPbError`](ipld_dagpb::Error) for more information. #[error(transparent)] DagPbError(#[from] ipld_dagpb::Error), + + /// Catch-all error for miscellaneous cases. + #[error("other error: {0}")] + Other(String), + + /// Error indicating that the requested block could not be found found in the CAR file's index. + #[error("block not found: {0}")] + BlockNotFound(String), } #[cfg(test)] diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 3b6bb66f3..a5c06494e 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -12,7 +12,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; -use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH}; +use super::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{ multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, @@ -76,7 +76,7 @@ impl Blockstore { root: None, blocks: IndexMap::new(), indexed: HashSet::new(), - chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE), + chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE), tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH), } } @@ -206,7 +206,7 @@ impl Default for Blockstore { root: None, blocks: IndexMap::new(), indexed: HashSet::new(), - chunk_size: DEFAULT_BLOCK_SIZE, + chunk_size: DEFAULT_CHUNK_SIZE, tree_width: DEFAULT_TREE_WIDTH, } } diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs index b8e494a21..87c2493d1 100644 --- a/mater/lib/src/stores/filestore.rs +++ b/mater/lib/src/stores/filestore.rs @@ -1,15 +1,26 @@ use bytes::BytesMut; -use futures::stream::StreamExt; +use digest::Digest; +use futures::StreamExt; use ipld_core::cid::Cid; -use sha2::{Digest, Sha256}; +use sha2::Sha256; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tracing::trace; use super::Config; use crate::{ - multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, - Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, + multicodec::SHA_256_CODE, + unixfs::{stream_balanced_tree, stream_balanced_tree_unixfs}, + CarV1Header, CarV2Header, CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, + SingleWidthIndex, }; +/// Converts a source stream into a CARv2 file and writes it to an output stream. +// +// The expanded trait bounds are required because: +// - `Send + 'static`: The async stream operations require the ability to move the source/output +// between threads and ensure they live long enough for the entire async operation +// - `AsyncSeek`: Required for the output to write the final header after processing all blocks +// - `Unpin`: Required because we need to move the source/output around during async operations async fn balanced_import( mut source: Src, mut output: Out, @@ -45,14 +56,10 @@ where // this means there is no right way of knowing when the reader is fully exhausted! // If we need to support a case like that, we just need to track how many times // the reader returned 0 and break at a certain point - if source.read_buf(&mut buf).await? == 0 { - // EOF but there's still content to yield -> yield it - if buf.len() > 0 { - let chunk = buf.split(); - yield chunk.freeze(); - } - break - } else if buf.len() >= chunk_size { + let read_bytes = source.read_buf(&mut buf).await?; + trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status"); + // EOF but there's still content to yield -> yield it + while buf.len() >= chunk_size { // The buffer may have a larger capacity than chunk_size due to reserve // this also means that our read may have read more bytes than we expected, // thats why we check if the length if bigger than the chunk_size and if so @@ -60,6 +67,14 @@ where let chunk = buf.split_to(chunk_size); yield chunk.freeze(); } // otherwise, the buffer is not full, so we don't do a thing + + if read_bytes == 0 && !buf.is_empty() { + let chunk = buf.split(); + yield chunk.freeze(); + break; + } else if read_bytes == 0 { + break; + } } }; @@ -78,10 +93,13 @@ where let mut root = None; let mut entries = vec![]; + while let Some(node) = nodes.next().await { let (node_cid, node_bytes) = node?; - let digest = node_cid.hash().digest().to_owned(); - let entry = IndexEntry::new(digest, (position - car_v1_start) as u64); + let entry = IndexEntry::new( + node_cid.hash().digest().to_vec(), + (position - car_v1_start) as u64, + ); entries.push(entry); position += writer.write_block(&node_cid, &node_bytes).await?; @@ -90,6 +108,86 @@ where } } + let index_offset = position; + let single_width_index = + SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries); + let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( + SHA_256_CODE, + single_width_index.into(), + )); + writer.write_index(&index).await?; + + writer.get_inner_mut().rewind().await?; + let header = CarV2Header::new( + false, + car_v1_start.try_into().unwrap(), + (index_offset - car_v1_start).try_into().unwrap(), + index_offset.try_into().unwrap(), + ); + writer.write_header(&header).await?; + + let header_v1 = CarV1Header::new(vec![root.unwrap()]); + writer.write_v1_header(&header_v1).await?; + + writer.finish().await?; + + Ok(root.unwrap()) +} + +async fn balanced_import_unixfs( + mut source: Src, + mut output: Out, + chunk_size: usize, + tree_width: usize, +) -> Result +where + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, +{ + let chunker = async_stream::try_stream! { + let mut buf = BytesMut::with_capacity(chunk_size); + loop { + let read_bytes = source.read_buf(&mut buf).await?; + while buf.len() >= chunk_size { + let chunk = buf.split_to(chunk_size); + yield chunk.freeze(); + } + + if read_bytes == 0 && !buf.is_empty() { + let chunk = buf.split(); + yield chunk.freeze(); + break; + } else if read_bytes == 0 { + break; + } + } + }; + + let nodes = stream_balanced_tree_unixfs(chunker, tree_width).peekable(); + tokio::pin!(nodes); + + let mut writer = CarV2Writer::new(&mut output); + let mut position = 0; + + position += writer.write_header(&CarV2Header::default()).await?; + let car_v1_start = position; + position += writer.write_v1_header(&CarV1Header::default()).await?; + + let mut root = None; + let mut entries = vec![]; + + while let Some(node) = nodes.next().await { + let (node_cid, node_bytes) = node?; + let entry = IndexEntry::new( + node_cid.hash().digest().to_vec(), + (position - car_v1_start) as u64, + ); + entries.push(entry); + position += writer.write_block(&node_cid, &node_bytes).await?; + + root = Some(node_cid); + } + let Some(root) = root else { return Err(Error::EmptyRootsError); }; @@ -130,14 +228,21 @@ pub async fn create_filestore( config: Config, ) -> Result where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, { match config { Config::Balanced { chunk_size, tree_width, - } => balanced_import(source, output, chunk_size, tree_width).await, + raw_mode, + } => { + if raw_mode { + balanced_import(source, output, chunk_size, tree_width).await + } else { + balanced_import_unixfs(source, output, chunk_size, tree_width).await + } + } } } @@ -145,13 +250,13 @@ where mod test { use std::path::Path; + use ipld_core::codec::Codec; + use quick_protobuf::MessageRead; use tempfile::tempdir; use tokio::fs::File; - use crate::{ - stores::{filestore::create_filestore, Config}, - test_utils::assert_buffer_eq, - }; + use super::*; + use crate::{test_utils::assert_buffer_eq, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; async fn test_filestore_roundtrip(original: P1, expected: P2) where @@ -163,7 +268,8 @@ mod test { let source_file = File::open(original).await.unwrap(); let output_file = File::create(&temp_path).await.unwrap(); - create_filestore(source_file, output_file, Config::default()) + let config = Config::balanced_raw(DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH); + create_filestore(source_file, output_file, config) .await .unwrap(); @@ -174,7 +280,7 @@ mod test { } #[tokio::test] - async fn test_filestore_lorem() { + async fn test_lorem_roundtrip() { test_filestore_roundtrip( "tests/fixtures/original/lorem.txt", "tests/fixtures/car_v2/lorem.car", @@ -183,11 +289,116 @@ mod test { } #[tokio::test] - async fn test_filestore_spaceglenda() { + async fn test_spaceglenda_roundtrip() { test_filestore_roundtrip( "tests/fixtures/original/spaceglenda.jpg", "tests/fixtures/car_v2/spaceglenda.car", ) .await } + + #[tokio::test] + async fn test_filestore_unixfs_dag_structure() { + use rand::{thread_rng, Rng}; + + const TEST_CHUNK_SIZE: usize = 64 * 1024; // 64 KiB + const TEST_TREE_WIDTH: usize = 2; + + let temp_dir = tempdir().unwrap(); + let input_path = temp_dir.path().join("input.bin"); + let temp_path = temp_dir.path().join("temp.car"); + + // Create test file with random data to ensure unique chunks. + let mut rng = thread_rng(); + let test_data = (0..512 * 1024).map(|_| rng.gen::()).collect::>(); + + trace!("Creating test file of size: {} bytes", test_data.len()); + tokio::fs::write(&input_path, &test_data).await.unwrap(); + + let source_file = File::open(&input_path).await.unwrap(); + let output_file = File::create(&temp_path).await.unwrap(); + + let config = Config::balanced_unixfs(TEST_CHUNK_SIZE, TEST_TREE_WIDTH); + + let root_cid = create_filestore(source_file, output_file, config) + .await + .unwrap(); + trace!("Root CID: {}", root_cid); + + // Read back and verify structure. + let file = File::open(&temp_path).await.unwrap(); + let mut reader = crate::CarV2Reader::new(file); + + reader.read_pragma().await.unwrap(); + reader.read_header().await.unwrap(); + reader.read_v1_header().await.unwrap(); + + // Track all unique blocks and statistics. + let mut unique_blocks = std::collections::HashSet::new(); + let mut leaf_blocks = std::collections::HashSet::new(); + let mut parent_blocks = std::collections::HashSet::new(); + let mut level_sizes = Vec::new(); + let mut current_level_nodes = std::collections::HashSet::new(); + let mut current_level = 0; + + while let Ok((cid, data)) = reader.read_block().await { + unique_blocks.insert(cid); + + let pb_node: ipld_dagpb::PbNode = ipld_dagpb::DagPbCodec::decode(&data[..]).unwrap(); + let mut proto_reader = + quick_protobuf::BytesReader::from_bytes(&pb_node.data.clone().unwrap()); + let bytes = &pb_node.data.unwrap(); + let unixfs_data = crate::unixfs::Data::from_reader(&mut proto_reader, bytes).unwrap(); + + if pb_node.links.is_empty() { + leaf_blocks.insert(cid); + trace!("Found leaf node: {} (size: {})", cid, data.len()); + trace!( + " Data size: {}", + unixfs_data.Data.as_ref().map_or(0, |d| d.len()) + ); + trace!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + // New level if this is first leaf. + if current_level_nodes.is_empty() { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + } + } else { + parent_blocks.insert(cid); + + trace!( + "Found parent node: {} with {} links (size: {})", + cid, + pb_node.links.len(), + data.len() + ); + trace!(" Total filesize: {:?}", unixfs_data.filesize); + trace!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + // Track level changes. + if !current_level_nodes.is_empty() + && current_level_nodes + .iter() + .any(|n| pb_node.links.iter().any(|l| l.cid == *n)) + { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + current_level_nodes.clear(); + } + } + + level_sizes[current_level] += 1; + current_level_nodes.insert(cid); + } + + // Verify structure. + assert!(!leaf_blocks.is_empty(), "No leaf nodes found"); + assert!(!parent_blocks.is_empty(), "No parent nodes found"); + assert_eq!( + unique_blocks.len(), + leaf_blocks.len() + parent_blocks.len(), + "Block count mismatch" + ); + } } diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 66920d97d..e56b94675 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -6,15 +6,17 @@ pub use blockstore::Blockstore; pub use file::FileBlockstore; pub use filestore::create_filestore; -/// The default block size, as defined in -/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13). -pub(crate) const DEFAULT_BLOCK_SIZE: usize = 1024 * 256; +/// The default chunk size for balanced trees (256 KiB) +/// Reference: https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 +pub const DEFAULT_CHUNK_SIZE: usize = 256 * 1024; -/// The default tree width, also called links per block, as defined in -/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30). -pub(crate) const DEFAULT_TREE_WIDTH: usize = 174; +/// The default number of children per parent node in balanced trees. +/// This value comes from the go-ipfs implementation and provides a good balance +/// between tree depth and width for most use cases. +pub const DEFAULT_TREE_WIDTH: usize = 174; -/// Store configuration options. +/// Store configuration options for controlling how data is stored and structured. +#[derive(Debug, Clone)] pub enum Config { /// The store should use the balanced tree layout, /// generating byte chunks of `chunk_size` and @@ -24,24 +26,95 @@ pub enum Config { chunk_size: usize, /// The number of children per parent node. tree_width: usize, + + /// If true, store content directly without UnixFS metadata. + raw_mode: bool, }, } impl Config { - /// Create a new [`Config::Balanced`]. - pub fn balanced(chunk_size: usize, tree_width: usize) -> Self { + /// Creates a new balanced tree configuration with the specified parameters. + /// + /// # Arguments + /// * `chunk_size` - Size of each data chunk in bytes + /// * `tree_width` - Maximum number of children per parent node + /// * `raw_mode` - Whether to store content directly without UnixFS wrapping. Raw is more space efficient but loses IPFS compatibility features. Default is false (uses UnixFS wrapping). + pub fn balanced(chunk_size: usize, tree_width: usize, raw_mode: bool) -> Self { Self::Balanced { chunk_size, tree_width, + raw_mode, } } + + /// Creates a new balanced tree configuration with UnixFS wrapping (recommended). + pub fn balanced_unixfs(chunk_size: usize, tree_width: usize) -> Self { + Self::balanced(chunk_size, tree_width, false) + } + + /// Creates a new balanced tree configuration with raw storage. + pub fn balanced_raw(chunk_size: usize, tree_width: usize) -> Self { + Self::balanced(chunk_size, tree_width, true) + } } impl Default for Config { fn default() -> Self { Self::Balanced { - chunk_size: DEFAULT_BLOCK_SIZE, + chunk_size: DEFAULT_CHUNK_SIZE, tree_width: DEFAULT_TREE_WIDTH, + raw_mode: false, // Default to UnixFS wrapping for IPFS compatibility + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = Config::default(); + match config { + Config::Balanced { + chunk_size, + tree_width, + raw_mode, + } => { + assert_eq!(chunk_size, DEFAULT_CHUNK_SIZE); + assert_eq!(tree_width, DEFAULT_TREE_WIDTH); + assert!(!raw_mode); + } } } + + #[test] + fn test_balanced_unixfs_config_builder() { + let chunk_size = 1024; + let tree_width = 10; + + let Config::Balanced { + chunk_size: cs, + tree_width: tw, + raw_mode, + } = Config::balanced_unixfs(chunk_size, tree_width); + assert_eq!(cs, chunk_size); + assert_eq!(tw, tree_width); + assert!(!raw_mode); + } + + #[test] + fn test_balanced_raw_config_builder() { + let chunk_size = 1024; + let tree_width = 10; + + let Config::Balanced { + chunk_size: cs, + tree_width: tw, + raw_mode, + } = Config::balanced_raw(chunk_size, tree_width); + assert_eq!(cs, chunk_size); + assert_eq!(tw, tree_width); + assert!(raw_mode); + } } diff --git a/mater/lib/src/unixfs/mod.rs b/mater/lib/src/unixfs/mod.rs index 1a84cfa66..ee61d5a0c 100644 --- a/mater/lib/src/unixfs/mod.rs +++ b/mater/lib/src/unixfs/mod.rs @@ -2,17 +2,16 @@ //! . mod unixfs_pb; - use std::collections::VecDeque; use async_stream::try_stream; use bytes::Bytes; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use ipld_core::{cid::Cid, codec::Codec}; use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; use quick_protobuf::MessageWrite; use sha2::Sha256; -use tokio_stream::{Stream, StreamExt}; +pub use unixfs_pb::{mod_Data, Data}; use crate::{ multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, @@ -20,7 +19,7 @@ use crate::{ }; #[derive(Debug, Clone, Copy)] -pub(crate) struct LinkInfo { +struct LinkInfo { raw_data_length: u64, encoded_data_length: u64, } @@ -41,7 +40,105 @@ enum TreeNode { } impl TreeNode { - fn encode(self) -> Result<((Cid, Bytes), LinkInfo), Error> { + fn encode_unixfs_leaf_node(chunk: &Bytes) -> Result<((Cid, Bytes), LinkInfo), Error> { + let chunk_len = chunk.len() as u64; + + // Build UnixFS metadata + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(chunk_len), + blocksizes: vec![chunk_len], + Data: Some(chunk.to_vec().into()), + hashType: None, + fanout: None, + }; + + // Encode UnixFS data and create DAG-PB node + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + let pb_node = PbNode { + links: vec![], + data: Some(data_buf.clone().into()), + }; + + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: chunk_len, + encoded_data_length: encoded.len() as u64, + }; + + Ok(((cid, encoded.into()), info)) + } + + fn encode_unixfs_stem_node( + children: Vec<(Cid, LinkInfo)>, + ) -> Result<((Cid, Bytes), LinkInfo), Error> { + // Process all children in a single pass, gathering totals and building links and blocksizes + let (total_raw_size, total_encoded_size, pb_links, blocksizes) = children.iter().fold( + ( + 0u64, + 0u64, + Vec::with_capacity(children.len()), + Vec::with_capacity(children.len()), + ), + |(raw_sum, encoded_sum, mut links, mut sizes), (child_cid, link_info)| { + sizes.push(link_info.raw_data_length); + links.push(PbLink { + cid: *child_cid, + name: Some("".to_string()), + size: Some(link_info.encoded_data_length), + }); + ( + raw_sum + link_info.raw_data_length, + encoded_sum + link_info.encoded_data_length, + links, + sizes, + ) + }, + ); + + // Create UnixFS metadata + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(total_raw_size), + blocksizes, + Data: None, + hashType: None, + fanout: None, + }; + + // Encode UnixFS data + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + // Create DAG-PB node + let pb_node = PbNode { + links: pb_links, + data: Some(data_buf.clone().into()), + }; + + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: data_buf.len() as u64, + encoded_data_length: encoded.len() as u64 + total_encoded_size, + }; + + Ok(((cid, encoded.into()), info)) + } + fn encode_raw(self) -> Result<((Cid, Bytes), LinkInfo), Error> { match self { TreeNode::Leaf(bytes) => { let data_length = bytes.len() as u64; @@ -261,9 +358,9 @@ where let input = input .err_into::() - // The TreeNode::Leaf(data).encode() just wraps it with a Cid marking the payload as Raw + // The TreeNode::Leaf(data).encode_raw() just wraps it with a Cid marking the payload as Raw // we may be able move this responsibility to the caller for more efficient memory usage - .map(|data| data.and_then(|data| TreeNode::Leaf(data).encode())) + .map(|data| data.and_then(|data| TreeNode::Leaf(data).encode_raw())) .err_into::(); tokio::pin!(input); @@ -293,7 +390,7 @@ where // it's most likely less performant (I didn't measure) // due to the different nature of the approaches (batch vs iterator) let links = std::mem::replace(&mut tree[level], Vec::with_capacity(width)); - let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; + let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode_raw()?; yield block; tree[level + 1].push((cid, link_info)); @@ -317,7 +414,7 @@ where // Once `input` is exhausted, we need to perform cleanup of any leftovers, // to do so, we start by popping levels from the front and building stems over them. while let Some(links) = tree.pop_front() { - let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; + let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode_raw()?; yield block; // If there's still a level in the front, it means the stem we just built will have a parent @@ -331,6 +428,59 @@ where } } +pub fn stream_balanced_tree_unixfs( + input: I, + width: usize, +) -> impl Stream> +where + I: Stream>, +{ + try_stream! { + let mut tree: VecDeque> = VecDeque::new(); + tree.push_back(vec![]); + + tokio::pin!(input); + + while let Some(data) = input.next().await { + let data = data?; + let (block @ (cid, _), link_info) = TreeNode::encode_unixfs_leaf_node(&data)?; + yield block; + + tree[0].push((cid, link_info)); + + // Build parent nodes when necessary + for level in 0..tree.len() { + if tree[level].len() < width { + break; + } + + let links = std::mem::replace(&mut tree[level], Vec::new()); + let (block @ (cid, _), link_info) = TreeNode::encode_unixfs_stem_node(links)?; + yield block; + + if level + 1 == tree.len() { + tree.push_back(vec![]); + } + tree[level + 1].push((cid, link_info)); + } + } + + // Finalize tree: Flush remaining levels + while let Some(links) = tree.pop_front() { + if links.is_empty() { + continue; + } + + let (block @ (cid, _), link_info) = TreeNode::encode_unixfs_stem_node(links)?; + yield block; + + if let Some(next_level) = tree.front_mut() { + next_level.push((cid, link_info)); + } + } + } +} + #[cfg(test)] mod tests { //! Tests were taken from [beetle][beetle] too, I did modify them to suit our needs. @@ -360,7 +510,7 @@ mod tests { if num_chunks / degree == 0 { let chunk = chunks.next().await.unwrap().unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block, _) = leaf.encode().unwrap(); + let (block, _) = leaf.encode_raw().unwrap(); tree[0].push(block); return tree; } @@ -368,7 +518,7 @@ mod tests { while let Some(chunk) = chunks.next().await { let chunk = chunk.unwrap(); let leaf = TreeNode::Leaf(chunk); - let (block @ (cid, _), link_info) = leaf.encode().unwrap(); + let (block @ (cid, _), link_info) = leaf.encode_raw().unwrap(); links[0].push((cid, link_info)); tree[0].push(block); } @@ -380,7 +530,7 @@ mod tests { let mut links_layer = Vec::with_capacity(count); for links in prev_layer.chunks(degree) { let stem = TreeNode::Stem(links.to_vec()); - let (block @ (cid, _), link_info) = stem.encode().unwrap(); + let (block @ (cid, _), link_info) = stem.encode_raw().unwrap(); links_layer.push((cid, link_info)); tree_layer.push(block); } @@ -444,12 +594,12 @@ mod tests { fn make_leaf(data: usize) -> ((Cid, Bytes), LinkInfo) { TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) - .encode() + .encode_raw() .unwrap() } fn make_stem(links: Vec<(Cid, LinkInfo)>) -> ((Cid, Bytes), LinkInfo) { - TreeNode::Stem(links).encode().unwrap() + TreeNode::Stem(links).encode_raw().unwrap() } #[tokio::test] diff --git a/mater/lib/src/v2/reader.rs b/mater/lib/src/v2/reader.rs index 4507c15c9..3a2cf3150 100644 --- a/mater/lib/src/v2/reader.rs +++ b/mater/lib/src/v2/reader.rs @@ -1,13 +1,16 @@ -use ipld_core::cid::Cid; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader}; +use std::collections::HashSet; + +use ipld_core::{cid::Cid, codec::Codec}; +use ipld_dagpb::{DagPbCodec, PbNode}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; use super::index::read_index; use crate::{ + multicodec::DAG_PB_CODE, v1::BlockMetadata, v2::{index::Index, Characteristics, Header, PRAGMA}, Error, }; - /// Low-level CARv2 reader. pub struct Reader { reader: R, @@ -22,7 +25,7 @@ impl Reader { impl Reader where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + AsyncSeek, { /// Takes in a CID and checks that the contents in the reader matches this CID pub async fn verify_cid(&mut self, contents_cid: Cid) -> Result<(), Error> { @@ -63,21 +66,48 @@ where { self.read_pragma().await?; let header = self.read_header().await?; - let _v1_header = self.read_v1_header().await?; + let v1_header = self.read_v1_header().await?; let mut written = 0; - while let Ok((_cid, contents)) = self.read_block().await { - // CAR file contents is empty - if contents.len() == 0 { - break; - } + // Keep track of root CID and position + let root_cid = v1_header.roots.first().ok_or(Error::EmptyRootsError)?; + let data_end = header.data_offset + header.data_size; + + // Track what we've processed and need to process + let mut processed: HashSet = HashSet::new(); + let mut to_process = vec![*root_cid]; + + while !to_process.is_empty() { let position = self.get_inner_mut().stream_position().await?; - let data_end = header.data_offset + header.data_size; - // Add the `written != 0` clause for files that are less than a single block. if position >= data_end && written != 0 { break; } - written += output_file.write(&contents).await?; + + if let Ok((cid, contents)) = self.read_block().await { + if contents.len() == 0 { + break; + } + + // Write the block data + written += output_file.write(&contents).await?; + + // If it's a DAG-PB node, queue up its children + if cid.codec() == DAG_PB_CODE && !processed.contains(&cid) { + let reader = std::io::BufReader::new(&contents[..]); + if let Ok(node) = DagPbCodec::decode(reader) { + let pb_node: PbNode = node; + to_process.extend( + pb_node + .links + .iter() + .map(|link| link.cid) + .filter(|cid| !processed.contains(cid)), + ); + } + } + + processed.insert(cid); + } } Ok(()) @@ -164,9 +194,11 @@ where } /// Function verifies that a given CID matches the CID for the CAR file in the given reader -pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> { - let mut reader = Reader::new(BufReader::new(reader)); - +pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> +where + R: AsyncRead + AsyncSeek + Unpin, +{ + let mut reader = Reader::new(reader); reader.verify_cid(contents_cid).await } diff --git a/storage-provider/server/Cargo.toml b/storage-provider/server/Cargo.toml index 597cdc4f3..88f1fb7f0 100644 --- a/storage-provider/server/Cargo.toml +++ b/storage-provider/server/Cargo.toml @@ -19,9 +19,11 @@ polka-storage-provider-common = { workspace = true, features = ["clap"] } primitives = { workspace = true, features = ["clap", "serde", "std"] } storagext = { workspace = true, features = ["clap"] } +async-stream.workspace = true async-trait = { workspace = true } axum = { workspace = true, features = ["macros", "multipart"] } base64 = { workspace = true } +bytes.workspace = true chrono = { workspace = true, features = ["serde"] } ciborium = { workspace = true } cid = { workspace = true, features = ["serde", "std"] } diff --git a/storage-provider/server/src/storage.rs b/storage-provider/server/src/storage.rs index 40663edfb..9c9f18da8 100644 --- a/storage-provider/server/src/storage.rs +++ b/storage-provider/server/src/storage.rs @@ -1,4 +1,4 @@ -use std::{io, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc}; +use std::{io, net::SocketAddr, path::PathBuf, pin::Pin, str::FromStr, sync::Arc}; use axum::{ body::Body, @@ -8,8 +8,9 @@ use axum::{ routing::{get, put}, Router, }; -use futures::{TryFutureExt, TryStreamExt}; -use mater::Cid; +use bytes::Bytes; +use futures::{Stream, TryStreamExt}; +use mater::{create_filestore, Cid, Config}; use polka_storage_provider_common::commp::{commp, CommPError}; use primitives::{commitment::piece::PaddedPieceSize, proofs::RegisteredPoStProof}; use tokio::{ @@ -23,6 +24,8 @@ use tokio_util::{ use tower_http::trace::TraceLayer; use uuid::Uuid; +type BoxedStream = Pin> + Send>>; + #[cfg(feature = "delia")] mod delia_imports { pub use axum::{http::Method, response::Json, routing::post}; @@ -42,11 +45,8 @@ use crate::db::DealDB; /// Shared state of the storage server. pub struct StorageServerState { pub car_piece_storage_dir: Arc, - pub deal_db: Arc, - pub listen_address: SocketAddr, - // I think this just needs the sector size actually #[allow(dead_code)] pub post_proof: RegisteredPoStProof, @@ -115,8 +115,13 @@ fn configure_router(state: Arc) -> Router { #[cfg(not(feature = "delia"))] fn config_non_delia(state: Arc) -> Router { + // Type annotation required to satisfy Send bounds needed for UnixFS processing + // across async operations and thread boundaries Router::new() - .route("/upload/:cid", put(upload)) + .route( + "/upload/:cid", + put(upload as fn(State>, Path, Request) -> _), + ) .route("/download/:cid", get(download)) .with_state(state) .layer( @@ -160,19 +165,20 @@ fn configure_router(state: Arc) -> Router { /// ``` #[tracing::instrument(skip_all, fields(cid))] async fn upload( - ref s @ State(ref state): State>, + State(ref state): State>, Path(cid): Path, request: Request, ) -> Result { + // Parse the provided CID. let deal_cid = cid::Cid::from_str(&cid).map_err(|err| { tracing::error!(cid, "failed to parse cid"); (StatusCode::BAD_REQUEST, err.to_string()) })?; + // Use deal_db (we need it now, so we clone it) let deal_db_conn = state.deal_db.clone(); - // If the deal hasn't been accepted, reject the upload + // If the deal hasn't been accepted, reject the upload. let proposed_deal = - // Move the fetch to the blocking pool since the RocksDB API is sync tokio::task::spawn_blocking(move || match deal_db_conn.get_proposed_deal(deal_cid) { Ok(Some(proposed_deal)) => Ok(proposed_deal), Ok(None) => { @@ -186,52 +192,58 @@ async fn upload( tracing::error!(%err, "failed to fetch proposed deal"); Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())) } - }).await.map_err(|err| { + }) + .await + .map_err(|err| { tracing::error!(%err, "failed to execute blocking task"); (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) })??; - // Branching needed here since the resulting `StreamReader`s don't have the same type + // Determine how to obtain the file's bytes: let file_cid = if request.headers().contains_key("Content-Type") { - // Handle multipart forms - let mut multipart = Multipart::from_request(request, &s) - .await - .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; - let Some(field) = multipart - .next_field() - .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string())) - .await? - else { - return Err((StatusCode::BAD_REQUEST, "empty request".to_string())); - }; - - let field_reader = StreamReader::new(field.map_err(std::io::Error::other)); - stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), field_reader) + // For multipart/form-data, we stream the field contents. + let state_clone = state.clone(); + let stream: BoxedStream = Box::pin(async_stream::try_stream! { + let mut multipart = Multipart::from_request(request, &state_clone) + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; + // Get the next field. + let mut field = multipart + .next_field() + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))? + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty request"))?; + // Yield each chunk as it becomes available. + while let Ok(Some(chunk)) = field.chunk().await { + yield chunk; + } + }); + let field_reader = StreamReader::new(stream); + stream_contents_to_car(state.car_piece_storage_dir.as_ref(), field_reader) .await .map_err(|err| { tracing::error!(%err, "failed to store file into CAR archive"); (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) })? } else { - // Read the request body into a CAR archive + // For direct uploads, convert the request body into a stream. let body_reader = StreamReader::new( request .into_body() .into_data_stream() .map_err(|err| io::Error::new(io::ErrorKind::Other, err)), ); - stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), body_reader) + stream_contents_to_car(state.car_piece_storage_dir.as_ref(), body_reader) .await .map_err(|err| { tracing::error!(%err, "failed to store file into CAR archive"); (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) })? }; - tracing::debug!("generated cid: {file_cid}"); - // NOTE(@jmg-duarte,03/10/2024): Maybe we should just register the file in RocksDB and keep a - // background process that vacuums the disk as necessary to simplify error handling here + tracing::debug!("generated cid: {file_cid}"); + // Open the CAR file to check its size. let (_, file_path) = content_path(&state.car_piece_storage_dir, file_cid); let file = File::open(&file_path).await.map_err(|err| { tracing::error!(%err, path = %file_path.display(), "failed to open file"); @@ -239,24 +251,19 @@ async fn upload( })?; let file_size = file .metadata() - .map_ok(|metadata| metadata.len()) .await + .map(|m| m.len()) .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; - // Check the piece size first since it's the cheap check + // Check that the piece size matches the proposal. let piece_size = PaddedPieceSize::from_arbitrary_size(file_size); - if !(proposed_deal.piece_size == *piece_size) { + if proposed_deal.piece_size != *piece_size { tracing::trace!( expected = proposed_deal.piece_size, actual = *piece_size, "piece size does not match the proposal piece size" ); - - // Not handling the error since there's little to be done here... - let _ = tokio::fs::remove_file(&file_path).await.inspect_err( - |err| tracing::error!(%err, path = %file_path.display(), "failed to delete file"), - ); - + let _ = tokio::fs::remove_file(&file_path).await; return Err(( StatusCode::BAD_REQUEST, "piece size does not match proposal".to_string(), @@ -264,13 +271,15 @@ async fn upload( } let piece_path = file_path.clone(); - // Calculate the piece commitment in the blocking thread pool since `calculate_piece_commitment` - // is CPU intensive — i.e. blocking — potentially improvement is to move this completely out of - // the tokio runtime into an OS thread + // Calculate the piece commitment in a blocking task. let piece_commitment_cid = tokio::task::spawn_blocking(move || -> Result<_, CommPError> { let (piece_commitment, _) = commp(&piece_path)?; let piece_commitment_cid = piece_commitment.cid(); - tracing::debug!(path = %piece_path.display(), commp = %piece_commitment_cid, "calculated piece commitment"); + tracing::debug!( + path = %piece_path.display(), + commp = %piece_commitment_cid, + "calculated piece commitment" + ); Ok(piece_commitment_cid) }) .await @@ -284,10 +293,7 @@ async fn upload( })?; if proposed_deal.piece_cid != piece_commitment_cid { - if let Err(err) = tokio::fs::remove_file(&file_path).await { - tracing::error!(%err, path = %file_path.display(), "failed to remove uploaded piece"); - } - + let _ = tokio::fs::remove_file(&file_path).await; return Err(( StatusCode::BAD_REQUEST, format!( @@ -298,17 +304,15 @@ async fn upload( } tracing::trace!("renaming car file"); - // We need to rename the file since the original storage name is based on the whole deal proposal CID, - // however, the piece is stored based on its piece_cid tokio::fs::rename( file_path, content_path(&state.car_piece_storage_dir, piece_commitment_cid).1, ) + .await .map_err(|err| { tracing::error!(%err, "failed to rename the CAR file"); (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) - }) - .await?; + })?; Ok(proposed_deal.piece_cid.to_string()) } @@ -366,13 +370,21 @@ fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) { (name, path) } -/// Reads bytes from the source and writes them to a CAR file. +/// Converts a source stream into a CARv2 file and writes it to an output stream. +/// +/// Send + 'static bounds are required because the UnixFS processing involves: +/// - Async stream processing that may cross thread boundaries +/// - State management for DAG construction and deduplication +/// - Block tracking that must be thread-safe +/// +/// The expanded trait bounds ensure that all data can be safely moved between +/// threads during async operations. async fn stream_contents_to_car( folder: &std::path::Path, source: R, ) -> Result> where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + Send + 'static, { // Temp file which will be used to store the CAR file content. The temp // director has a randomized name and is created in the same folder as the @@ -384,7 +396,10 @@ where // Stream the body from source to the temp file. let file = File::create(&temp_file_path).await?; let writer = BufWriter::new(file); - let cid = mater::create_filestore(source, writer, mater::Config::default()).await?; + + let config = Config::default(); + + let cid = create_filestore(source, writer, config).await?; tracing::trace!("finished writing the CAR archive"); // If the file is successfully written, we can now move it to the final @@ -463,7 +478,8 @@ mod delia_endpoints { // Calculate piece commitment let piece_commitment_cid = tokio::task::spawn_blocking(move || -> Result<_, CommPError> { - let (piece_commitment, _) = commp(&piece_path)?; + // Use `file_path` here instead of the undefined `piece_path` + let (piece_commitment, _) = commp(&file_path)?; let piece_commitment_cid = piece_commitment.cid(); tracing::debug!(path = %file_path.display(), commp = %piece_commitment_cid, "calculated piece commitment"); Ok(piece_commitment_cid)