diff --git a/mater/cli/src/convert.rs b/mater/cli/src/convert.rs index 09e1a80b7..cd0802ec1 100644 --- a/mater/cli/src/convert.rs +++ b/mater/cli/src/convert.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use mater::{create_filestore, Cid, Config, Error}; +use mater::{Blockwriter, Cid, Error}; use tokio::fs::File; /// Converts a file at location `input_path` to a CARv2 file at `output_path` @@ -16,10 +16,10 @@ pub(crate) async fn convert_file_to_car( }?; if input_path.as_os_str() == "-" { - create_filestore(tokio::io::stdin(), output_file, Config::default()).await + Blockwriter::import(tokio::io::stdin(), output_file).await } else { let source_file = File::open(input_path).await?; - create_filestore(source_file, output_file, Config::default()).await + Blockwriter::import(source_file, output_file).await } } diff --git a/mater/lib/benches/benchmark.rs b/mater/lib/benches/benchmark.rs index 2c6c9333f..5d801d561 100644 --- a/mater/lib/benches/benchmark.rs +++ b/mater/lib/benches/benchmark.rs @@ -6,7 +6,7 @@ use std::{ }; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use mater::{create_filestore, Blockwriter, Config}; +use mater::Blockwriter; use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng}; use tempfile::{tempdir, TempDir}; use tokio::{ @@ -169,17 +169,15 @@ fn prepare_source_file(content: &[u8]) -> (TempDir, PathBuf) { (temp_dir, file) } -/// Create a filestore. This function is benchmarked. -async fn create_filestore_benched(source: &Path, target: &Path) { +/// Import a source into a writer. This function is benchmarked. +async fn blockwriter_import(source: &Path, target: &Path) { let source_file = File::open(source).await.unwrap(); let output_file = File::create(target).await.unwrap(); - create_filestore(source_file, output_file, Config::default()) - .await - .unwrap(); + Blockwriter::import(source_file, output_file).await.unwrap(); } -fn filestore(c: &mut Criterion) { +fn import(c: &mut Criterion) { let files = get_source_files(); for (params, source_file, temp_dir) in files { @@ -187,11 +185,11 @@ fn filestore(c: &mut Criterion) { c.bench_with_input(BenchmarkId::new("filestore", params), &(), |b, _: &()| { b.to_async(TokioExecutor::new().unwrap()) - .iter(|| create_filestore_benched(&source_file, &target_file)); + .iter(|| blockwriter_import(&source_file, &target_file)); }); } } criterion_group!(bench_reading, read_write); -criterion_group!(bench_filestore, filestore); -criterion_main!(bench_reading, bench_filestore); +criterion_group!(bench_import, import); +criterion_main!(bench_reading, bench_import); diff --git a/mater/lib/src/file_reader.rs b/mater/lib/src/file_reader.rs index 67ab61d84..97898a62d 100644 --- a/mater/lib/src/file_reader.rs +++ b/mater/lib/src/file_reader.rs @@ -178,10 +178,6 @@ mod test { #[tokio::test] async fn read_duplicated_blocks() { - let raw_input = tokio::fs::read("tests/fixtures/original/zero") - .await - .unwrap(); - let mut loader = CarExtractor::from_path("tests/fixtures/car_v2/zero.car") .await .unwrap(); @@ -189,7 +185,11 @@ mod test { let mut out_check = Cursor::new(vec![1u8; 4096]); loader.copy_tree(&root, &mut out_check).await.unwrap(); - assert_eq!(raw_input, out_check.into_inner()); + let expected = [0u8; 524288].as_slice(); + let inner = out_check.into_inner(); + let result = inner.as_slice(); + + assert_eq!(expected, result); } async fn load_and_compare(original: P1, path: P2) diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index c06888a69..37776f04a 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -26,7 +26,7 @@ pub use cid::{CidExt, MultihashExt}; pub use file_reader::CarExtractor; pub use ipld_core::cid::Cid; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockwriter, Config, FileBlockstore}; +pub use stores::{Blockwriter, Config, FileBlockstore}; pub use v1::{BlockMetadata, Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 2a9c0cb73..9441cfadd 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -7,11 +7,10 @@ use futures::stream::StreamExt; use ipld_core::cid::Cid; use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; -use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{ unixfs::stream_balanced_tree, v1::{self}, - v2, BlockMetadata, Error, Index, IndexEntry, IndexSorted, SingleWidthIndex, + v2, BlockMetadata, Config, Error, Index, IndexEntry, IndexSorted, SingleWidthIndex, }; /// CAR file writer. @@ -20,6 +19,7 @@ pub struct Blockwriter { index: HashMap, roots: Vec, started: bool, + config: Config, } impl Blockwriter { @@ -30,6 +30,7 @@ impl Blockwriter { index: HashMap::new(), roots: Vec::with_capacity(1), started: false, + config: Default::default(), } } @@ -66,6 +67,7 @@ impl Blockwriter>> { index: HashMap::new(), roots: Vec::with_capacity(1), started: false, + config: Default::default(), } } } @@ -74,6 +76,19 @@ impl Blockwriter where W: AsyncWrite + AsyncSeek + Unpin, { + /// Convert `source` into a CAR file, writing it to `writer`. + /// Returns the root [`Cid`]. + pub async fn import(source: S, writer: W) -> Result + where + S: AsyncRead + Unpin, + { + let mut writer = Self::new(writer); + writer.write_from(source).await?; + let root = *writer.roots.first().ok_or(Error::EmptyRootsError)?; + writer.finish().await?; + Ok(root) + } + /// Writes the contents from `source`, adding a new root to [`Blockwriter`]. pub async fn write_from(&mut self, source: S) -> Result<(), Error> where @@ -87,8 +102,15 @@ where let mut current_position = self.writer.get_inner_mut().stream_position().await?; - let chunker = crate::chunker::byte_stream_chunker(source, DEFAULT_BLOCK_SIZE); - let nodes = stream_balanced_tree(chunker, DEFAULT_TREE_WIDTH).peekable(); + let nodes = match self.config { + Config::Balanced { + chunk_size, + tree_width, + } => { + let chunker = crate::chunker::byte_stream_chunker(source, chunk_size); + stream_balanced_tree(chunker, tree_width).peekable() + } + }; tokio::pin!(nodes); let mut root = None; diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs deleted file mode 100644 index 704add767..000000000 --- a/mater/lib/src/stores/filestore.rs +++ /dev/null @@ -1,150 +0,0 @@ -use futures::stream::StreamExt; -use ipld_core::cid::Cid; -use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; - -use super::Config; -use crate::{ - multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, - Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, -}; - -async fn balanced_import( - source: Src, - mut output: Out, - chunk_size: usize, - tree_width: usize, -) -> Result -where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, -{ - let chunker = crate::chunker::byte_stream_chunker(source, chunk_size); - let nodes = stream_balanced_tree(chunker, tree_width).peekable(); - tokio::pin!(nodes); - - let mut writer = CarV2Writer::new(&mut output); - let mut position = 0; - - let placeholder_header = CarV2Header::default(); - position += writer.write_header(&placeholder_header).await?; - let car_v1_start = position; - - let placeholder_header_v1 = CarV1Header::default(); - position += writer.write_v1_header(&placeholder_header_v1).await?; - - let mut root = None; - let mut entries = vec![]; - while let Some(node) = nodes.next().await { - let (node_cid, node_bytes) = node?; - let digest = node_cid.hash().digest().to_owned(); - let entry = IndexEntry::new(digest, (position - car_v1_start) as u64); - entries.push(entry); - position += writer.write_block(&node_cid, &node_bytes).await?; - - if nodes.as_mut().peek().await.is_none() { - root = Some(node_cid); - } - } - - let Some(root) = root else { - return Err(Error::EmptyRootsError); - }; - - let index_offset = position; - let single_width_index = - SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries); - let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( - SHA_256_CODE, - single_width_index.into(), - )); - writer.write_index(&index).await?; - - // Go back to the beginning of the file - writer.get_inner_mut().rewind().await?; - let header = CarV2Header::new( - false, - (car_v1_start) as u64, - (index_offset - car_v1_start) as u64, - (index_offset) as u64, - ); - writer.write_header(&header).await?; - - // If the length of the roots doesn't match the previous one, you WILL OVERWRITE parts of the file - let header_v1 = CarV1Header::new(vec![root]); - writer.write_v1_header(&header_v1).await?; - - // Flush even if the caller doesn't - we did our best - writer.finish().await?; - - Ok(root) -} - -/// Convert a `source` stream into a CARv2 file and write it to an `output` stream. -pub async fn create_filestore( - source: Src, - output: Out, - config: Config, -) -> Result -where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, -{ - match config { - Config::Balanced { - chunk_size, - tree_width, - } => balanced_import(source, output, chunk_size, tree_width).await, - } -} - -#[cfg(test)] -mod test { - use std::path::Path; - - use tempfile::tempdir; - use tokio::fs::File; - - use crate::{ - stores::{filestore::create_filestore, Config}, - test_utils::assert_buffer_eq, - }; - - async fn test_filestore_roundtrip(original: P1, expected: P2) - where - P1: AsRef, - P2: AsRef, - { - let temp_dir = tempdir().unwrap(); - let temp_path = temp_dir.path().join("temp.car"); - - let source_file = File::open(original).await.unwrap(); - let output_file = File::create(&temp_path).await.unwrap(); - create_filestore(source_file, output_file, Config::default()) - .await - .unwrap(); - - let expected = tokio::fs::read(expected.as_ref()).await.unwrap(); - let result = tokio::fs::read(temp_path).await.unwrap(); - - assert_buffer_eq!(&expected, &result); - } - - #[tokio::test] - async fn test_filestore_lorem() { - test_filestore_roundtrip( - "tests/fixtures/original/lorem.txt", - "tests/fixtures/car_v2/lorem.car", - ) - .await - } - - #[tokio::test] - async fn test_filestore_spaceglenda() { - test_filestore_roundtrip( - "tests/fixtures/original/spaceglenda.jpg", - "tests/fixtures/car_v2/spaceglenda.car", - ) - .await - } -} diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index ffb6d9d24..970cb75c7 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -1,10 +1,8 @@ mod blockstore; mod file; -mod filestore; pub use blockstore::Blockwriter; pub use file::FileBlockstore; -pub use filestore::create_filestore; /// The default block size, as defined in /// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13). diff --git a/mater/lib/tests/fixtures/original/zero b/mater/lib/tests/fixtures/original/zero deleted file mode 100644 index 8dd9b3239..000000000 Binary files a/mater/lib/tests/fixtures/original/zero and /dev/null differ diff --git a/storage-provider/server/src/storage.rs b/storage-provider/server/src/storage.rs index 340101a74..526e355be 100644 --- a/storage-provider/server/src/storage.rs +++ b/storage-provider/server/src/storage.rs @@ -404,7 +404,7 @@ where // Stream the body from source to the temp file. let file = File::create(&temp_file_path).await?; let writer = BufWriter::new(file); - let cid = mater::create_filestore(source, writer, mater::Config::default()).await?; + let cid = mater::Blockwriter::import(source, writer).await?; tracing::trace!("finished writing the CAR archive"); // If the file is successfully written, we can now move it to the final