diff --git a/Cargo.lock b/Cargo.lock index f86e5f7e1..51602d2a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9190,8 +9190,6 @@ dependencies = [ "tempfile", "thiserror 2.0.8", "tokio", - "tokio-stream", - "tokio-util", ] [[package]] diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 0191772cf..ebd6f11b6 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -26,9 +26,7 @@ serde = { workspace = true, features = ["derive"] } serde_ipld_dagcbor.workspace = true sha2 = { workspace = true, default-features = true } thiserror.workspace = true -tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } -tokio-stream.workspace = true -tokio-util = { workspace = true, features = ["io"] } +tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread", "sync"] } # Optional dependencies blockstore = { workspace = true, optional = true } diff --git a/mater/lib/benches/benchmark.rs b/mater/lib/benches/benchmark.rs index 6b55e4954..2c6c9333f 100644 --- a/mater/lib/benches/benchmark.rs +++ b/mater/lib/benches/benchmark.rs @@ -5,11 +5,15 @@ use std::{ sync::OnceLock, }; -use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; -use mater::{create_filestore, Blockstore, Config}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use mater::{create_filestore, Blockwriter, Config}; use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng}; use tempfile::{tempdir, TempDir}; -use tokio::{fs::File, runtime::Runtime as TokioExecutor}; +use tokio::{ + fs::File, + io::{AsyncSeek, AsyncWrite}, + runtime::Runtime as TokioExecutor, +}; static FILES: OnceLock> = OnceLock::new(); fn get_source_files() -> &'static Vec<(Params, PathBuf, TempDir)> { @@ -131,53 +135,25 @@ fn generate_content(params: &Params) -> Vec { bytes } -/// Read content to a Blockstore. This function is benchmarked. -async fn read_content_benched(content: &[u8], mut store: Blockstore) { +/// Read/Write content to a Blockstore. This function is benchmarked. +async fn read_write_content_benched(content: &[u8], mut store: Blockwriter) +where + W: AsyncWrite + AsyncSeek + Unpin, +{ let cursor = Cursor::new(content); - store.read(cursor).await.unwrap() + store.write_from(cursor).await.unwrap(); + store.finish().await.unwrap(); } -fn read(c: &mut Criterion) { +fn read_write(c: &mut Criterion) { let files = get_source_files(); for (params, source_file, _) in files { let content = std::fs::read(&source_file).unwrap(); c.bench_with_input(BenchmarkId::new("read", params), params, |b, _params| { - b.to_async(TokioExecutor::new().unwrap()).iter(|| { - read_content_benched( - &content, - Blockstore::with_parameters(Some(BLOCK_SIZE), None), - ) - }); - }); - } -} - -/// Write content from a Blockstore. This function is benchmarked. -async fn write_contents_benched(buffer: Vec, store: Blockstore) { - store.write(buffer).await.unwrap(); -} - -fn write(c: &mut Criterion) { - let runtime = TokioExecutor::new().unwrap(); - let files = get_source_files(); - - for (params, source_file, _) in files { - let mut blockstore = Blockstore::with_parameters(Some(BLOCK_SIZE), None); - - // Read file contents to the blockstore - runtime.block_on(async { - let file = File::open(&source_file).await.unwrap(); - blockstore.read(file).await.unwrap() - }); - - c.bench_with_input(BenchmarkId::new("write", params), &(), |b, _: &()| { - b.to_async(TokioExecutor::new().unwrap()).iter_batched( - || (blockstore.clone(), Vec::with_capacity(params.size)), - |(blockstore, buffer)| write_contents_benched(buffer, blockstore), - BatchSize::SmallInput, - ); + b.to_async(TokioExecutor::new().unwrap()) + .iter(|| read_write_content_benched(&content, Blockwriter::in_memory())); }); } } @@ -216,7 +192,6 @@ fn filestore(c: &mut Criterion) { } } -criterion_group!(bench_reading, read); -criterion_group!(bench_writing, write); +criterion_group!(bench_reading, read_write); criterion_group!(bench_filestore, filestore); -criterion_main!(bench_reading, bench_writing, bench_filestore); +criterion_main!(bench_reading, bench_filestore); diff --git a/mater/lib/src/chunker.rs b/mater/lib/src/chunker.rs new file mode 100644 index 000000000..38841ffa6 --- /dev/null +++ b/mater/lib/src/chunker.rs @@ -0,0 +1,61 @@ +use bytes::{Bytes, BytesMut}; +use futures::Stream; +use tokio::io::{AsyncRead, AsyncReadExt}; + +pub(crate) fn byte_stream_chunker( + mut source: S, + chunk_size: usize, +) -> impl Stream> +where + S: AsyncRead + Unpin, +{ + // This custom stream gathers incoming buffers into a single byte chunk of `chunk_size` + // `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt + // to fill it's buffer before returning, voiding the whole promise of properly sized chunks + // There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist: + // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 + async_stream::try_stream! { + let mut buf = BytesMut::with_capacity(chunk_size); + + loop { + if buf.capacity() < chunk_size { + // BytesMut::reserve *may* allocate more memory than requested to avoid further + // allocations, while that's very helpful, it's also unpredictable. + // If and when necessary, we can replace this with the following line: + // std::mem::replace(buf, BytesMut::with_capacity(chunk_size)): + + // Reserve only the difference as the split may leave nothing, or something + buf.reserve(chunk_size - buf.capacity()); + } + + // If the read length is 0, we *assume* we reached EOF + // tokio's docs state that this does not mean we exhausted the reader, + // as it may be able to return more bytes later, *however*, + // this means there is no right way of knowing when the reader is fully exhausted! + // If we need to support a case like that, we just need to track how many times + // the reader returned 0 and break at a certain point + if source.read_buf(&mut buf).await? == 0 { + // EOF but there's still content to yield -> yield it + loop { + // Due to the lack of guarantees on the resulting size of `BytesMut::reserve` + // the buffer may contain more than `chunk_size`, + // in that case we must yield the remaning complete chunks first + let chunk = match buf.len() { + 0 => break, + len if len <= chunk_size => buf.split(), + _ => buf.split_to(chunk_size), + }; + yield chunk.freeze(); + } + break + } else if buf.len() >= chunk_size { + // The buffer may have a larger capacity than chunk_size due to reserve + // this also means that our read may have read more bytes than we expected, + // thats why we check if the length if bigger than the chunk_size and if so + // we split the buffer to the chunk_size, then freeze and return + let chunk = buf.split_to(chunk_size); + yield chunk.freeze(); + } // otherwise, the buffer is not full, so we don't do a thing + } + } +} diff --git a/mater/lib/src/file_reader.rs b/mater/lib/src/file_reader.rs index 8b3d98475..67ab61d84 100644 --- a/mater/lib/src/file_reader.rs +++ b/mater/lib/src/file_reader.rs @@ -174,24 +174,18 @@ where mod test { use std::{io::Cursor, path::Path}; - use crate::{Blockstore, CarExtractor}; + use crate::CarExtractor; - /// Ensures that duplicated blocks #[tokio::test] async fn read_duplicated_blocks() { - let raw_input = std::iter::repeat(0).take(4096).collect::>(); - - let mut bs = Blockstore::with_parameters(Some(1024), None); - bs.read(Cursor::new(raw_input.clone())).await.unwrap(); - - // 1519 is the expected CAR file size after deduplicating and writing the CAR file - let mut out_car_buffer = Vec::with_capacity(1519); - bs.write(&mut out_car_buffer).await.unwrap(); - assert_eq!(out_car_buffer.len(), 1519); + let raw_input = tokio::fs::read("tests/fixtures/original/zero") + .await + .unwrap(); - let mut loader = CarExtractor::from_vec(out_car_buffer).await.unwrap(); + let mut loader = CarExtractor::from_path("tests/fixtures/car_v2/zero.car") + .await + .unwrap(); let root = loader.roots().await.unwrap()[0]; - let mut out_check = Cursor::new(vec![1u8; 4096]); loader.copy_tree(&root, &mut out_check).await.unwrap(); diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 2b8432589..c06888a69 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -12,6 +12,7 @@ #![cfg_attr(not(test), deny(clippy::unwrap_used))] mod async_varint; +mod chunker; mod cid; mod file_reader; mod multicodec; @@ -25,7 +26,7 @@ pub use cid::{CidExt, MultihashExt}; pub use file_reader::CarExtractor; pub use ipld_core::cid::Cid; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; +pub use stores::{create_filestore, Blockwriter, Config, FileBlockstore}; pub use v1::{BlockMetadata, Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 3b6bb66f3..2a9c0cb73 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -1,370 +1,237 @@ -// NOTE(@jmg-duarte,28/05/2024): the blockstore can (and should) evolve to support other backends. -// At the time of writing, there is no need invest more time in it because the current PR(#25) is delayed enough. - -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{BTreeMap, HashMap}, + io::Cursor, +}; -use bytes::Bytes; -use indexmap::IndexMap; -use integer_encoding::VarInt; +use futures::stream::StreamExt; use ipld_core::cid::Cid; -use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_stream::StreamExt; -use tokio_util::io::ReaderStream; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{ - multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, - Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, + unixfs::stream_balanced_tree, + v1::{self}, + v2, BlockMetadata, Error, Index, IndexEntry, IndexSorted, SingleWidthIndex, }; -/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory. -/// -/// The store will chunk data blocks into `chunk_size` and "gather" nodes in groups with at most `tree_width` children. -/// You can visualize the underlying tree in , using the "Balanced DAG" layout. -/// -/// It is necessary to keep the blocks somewhere before writing them to a file since the CARv2 header -/// has data size, index offset and indexes fields, all these requiring information that only becomes -/// "available" after you process all the blocks. -/// -/// The store keeps track of ([`Cid`], [`Bytes`]) pairs, performing de-duplication based on the [`Cid`]. -/// -/// **Important note: currently, the blockstore only supports a single file!** -#[derive(Debug, Clone)] -pub struct Blockstore { - root: Option, - blocks: IndexMap, - indexed: HashSet, - - chunk_size: usize, - tree_width: usize, +/// CAR file writer. +pub struct Blockwriter { + writer: v2::Writer, + index: HashMap, + roots: Vec, + started: bool, } -impl Blockstore { - /// The size of the [`Header`] when encoded using [`DagCborCodec`]. - /// - /// The formula is: `overhead + 37 * roots.len()`. - /// It is based on reversing the CBOR encoding, see an example: - /// ```text - /// A2 # map(2) - /// 65 # text(5) - /// 726F6F7473 # "roots" - /// 81 # array(1) - /// D8 2A # tag(42) - /// 58 25 # bytes(37) - /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B - /// 67 # text(7) - /// 76657273696F6E # "version" - /// 01 # unsigned(1) - /// ``` - /// In this case we're always doing a single root, so we just use the fixed size: 58 - /// - /// Is this cheating? Yes. The alternative is to encode the CARv1 header twice. - /// We can cache it, but for now, this should be better. - const V1_HEADER_OVERHEAD: u64 = 58; - - /// Construct a new [`Blockstore`], using the default parameters. - pub fn new() -> Self { - Default::default() +impl Blockwriter { + /// Creates a new [`Blockwriter`] with the given `writer`. + pub fn new(writer: W) -> Self { + Self { + writer: v2::Writer::new(writer), + index: HashMap::new(), + roots: Vec::with_capacity(1), + started: false, + } + } + + /// Creates a new [`v1::Header`]. + fn header_v1(&self) -> v1::Header { + // Lack of partial moves make this clone "required" + v1::Header::new(self.roots.clone()) + } + + /// Creates a new [`v2::Header`]. + fn header_v2(&self, v1_header: &v1::Header) -> v2::Header { + let total_block_encoded_len = self + .index + .values() + .map(BlockMetadata::encoded_len) + .sum::(); + + let v1_payload_len = v1_header.encoded_len() as u64 + total_block_encoded_len; + + v2::Header::new( + false, + v2::Header::SIZE, + v1_payload_len, + v2::Header::SIZE + v1_payload_len, + ) } +} - /// Construct a new [`Blockstore`], using custom parameters. - /// If set to `None`, the corresponding default value will be used. - pub fn with_parameters(chunk_size: Option, tree_width: Option) -> Self { - // NOTE(@jmg-duarte,28/05/2024): once the time comes, this method should probably be replaced with a builder +impl Blockwriter>> { + /// Creates an in-memory [`Blockwriter`]. + pub fn in_memory() -> Self { Self { - root: None, - blocks: IndexMap::new(), - indexed: HashSet::new(), - chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE), - tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH), + writer: v2::Writer::new(Cursor::new(vec![])), + index: HashMap::new(), + roots: Vec::with_capacity(1), + started: false, } } +} - /// Fully read the contents of an arbitrary `reader` into the [`Blockstore`], - /// converting the contents into a CARv2 file. - pub async fn read(&mut self, reader: R) -> Result<(), Error> +impl Blockwriter +where + W: AsyncWrite + AsyncSeek + Unpin, +{ + /// Writes the contents from `source`, adding a new root to [`Blockwriter`]. + pub async fn write_from(&mut self, source: S) -> Result<(), Error> where - R: AsyncRead + Unpin, + S: AsyncRead + Unpin, { - let chunks = ReaderStream::with_capacity(reader, self.chunk_size); - - // The `stream -> pin -> peekable` combo instead of `stream -> peekable -> pin` feels weird - // but it has to do with two things: - // - The fact that the stream can be self-referential: - // https://users.rust-lang.org/t/why-is-pin-mut-needed-for-iteration-of-async-stream/51107 - // - Using a tokio_stream::Peekable instead of futures::Peekable, they differ on who is required to be pinned - // - tokio_stream::Peekable::peek(&mut self) - // https://github.com/tokio-rs/tokio/blob/14c17fc09656a30230177b600bacceb9db33e942/tokio-stream/src/stream_ext/peekable.rs#L26-L37 - // - futures::Peekable::peek(self: Pin<&mut Self>) - // https://github.com/rust-lang/futures-rs/blob/c507ff833728e2979cf5519fc931ea97308ec876/futures-util/src/stream/stream/peek.rs#L38-L40 - let tree = stream_balanced_tree(chunks, self.tree_width); - tokio::pin!(tree); - let mut tree = tree.peekable(); - - while let Some(block) = tree.next().await { - let (cid, bytes) = block?; - self.insert(cid, bytes, true); - - // If the stream is exhausted, we know the current block is the root - if tree.peek().await.is_none() { - // The root should always be indexed, there's no official spec saying it should though, it just makes sense. - // So, if the insert line is changed, the root should be placed in the `indexed` structure here - self.root = Some(cid); - } + if !self.started { + self.writer.write_header(&Default::default()).await?; + self.writer.write_v1_header(&Default::default()).await?; + self.started = true; } - Ok(()) - } + let mut current_position = self.writer.get_inner_mut().stream_position().await?; + + let chunker = crate::chunker::byte_stream_chunker(source, DEFAULT_BLOCK_SIZE); + let nodes = stream_balanced_tree(chunker, DEFAULT_TREE_WIDTH).peekable(); + tokio::pin!(nodes); + + let mut root = None; + while let Some(node) = nodes.next().await { + let (node_cid, node_bytes) = node?; + let block_offset = current_position; + + if !self.index.contains_key(&node_cid) { + let written = self.writer.write_block(&node_cid, &node_bytes).await? as u64; + current_position += written; + self.index.insert( + node_cid, + BlockMetadata { + block_offset, + cid: node_cid, + data_offset_source: block_offset + written, + data_size: node_bytes.len() as u64, + }, + ); + } - /// Write the contents of the [`Blockstore`] as CARv2 to a writer. - pub async fn write(mut self, writer: W) -> Result - where - W: AsyncWrite + Unpin, - { - let mut position = 0; - - let mut writer = CarV2Writer::new(writer); - let header_v2 = self.header_v2(); - - // Writing the CARv1 starts where the CARv2 header ends - // this value is required for indexing, - // whose offset starts at the beginning of the CARv1 header - let car_v1_start = writer.write_header(&header_v2).await?; - position += car_v1_start; - - // CARv1 files are REQUIRED to have a root - let header_v1 = self - .root - .map(|root| CarV1Header::new(vec![root])) - .ok_or(Error::EmptyRootsError)?; - position += writer.write_v1_header(&header_v1).await?; - - let mut offsets = HashMap::new(); - for (cid, block) in self.blocks.drain(..) { - if self.indexed.contains(&cid) { - offsets.insert(cid, position - car_v1_start); + if nodes.as_mut().peek().await.is_none() { + root = Some(node_cid); } - position += writer.write_block(&cid, &block).await?; } - let count = offsets.len() as u64; - let entries = offsets - .into_iter() - .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64)) - .collect(); - let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( - SHA_256_CODE, - SingleWidthIndex::new(Sha256::output_size() as u32, count, entries).into(), - )); - position += writer.write_index(&index).await?; - - Ok(position) - } - - /// Get the [`CarV2Header`] that will be written out. - fn header_v2(&self) -> CarV2Header { - let data_offset = CarV2Header::SIZE; - let data_size: u64 = self - .blocks - .iter() - .map(|(cid, bytes)| { - let size = (cid.encoded_len() + bytes.len()) as u64; - let varint_size = size.required_space() as u64; - size + varint_size - }) - .sum(); - - let header_v1_varint = Self::V1_HEADER_OVERHEAD.required_space() as u64; - let car_v1_payload_length = Self::V1_HEADER_OVERHEAD + header_v1_varint + data_size; - - // If there is padding, this does not apply, however, the go-car tool doesn't seem to ever add padding - let index_offset = data_offset + car_v1_payload_length; - - // NOTE(@jmg-duarte,28/05/2024): the `fully_indexed` field is currently set to `false` as the - // go-car tool doesn't seem to ever set it, however, according to the written definition we have from the spec - // we're performing full indexing, as all blocks are inserted with `index: true`. - CarV2Header::new(false, data_offset, car_v1_payload_length, index_offset) - } - - /// Insert a new block into the [`Blockstore`]. - /// - /// If the [`Cid`] has been previously inserted, this function is a no-op. - fn insert(&mut self, cid: Cid, data: Bytes, index: bool) { - if !self.blocks.contains_key(&cid) { - self.blocks.insert_full(cid, data); - if index { - self.indexed.insert(cid); - } + match root { + Some(root) => self.roots.push(root), + None => return Err(Error::EmptyRootsError), } + + Ok(()) } -} -impl Default for Blockstore { - fn default() -> Self { - Self { - root: None, - blocks: IndexMap::new(), - indexed: HashSet::new(), - chunk_size: DEFAULT_BLOCK_SIZE, - tree_width: DEFAULT_TREE_WIDTH, + /// Writes the final header as well as the indexes, flushes the inner writer and returns it. + pub async fn finish(mut self) -> Result { + self.writer.get_inner_mut().rewind().await?; + + let v1_header = self.header_v1(); + let v2_header = self.header_v2(&v1_header); + + self.writer.write_header(&v2_header).await?; + self.writer.write_v1_header(&v1_header).await?; + + self.writer + .get_inner_mut() + .seek(std::io::SeekFrom::Start(v2_header.index_offset)) + .await?; + + // Abstracting away the index writing is not that simple because we have extra bookkeeping + let mut multihash_index: BTreeMap>> = BTreeMap::new(); + for (cid, metadata) in self.index { + let entry = IndexEntry::new( + metadata.cid.hash().digest().to_vec(), + metadata.block_offset - v2_header.data_offset, + ); + + let cid_hash_code = cid.hash().code(); + let cid_hash_digest_len = cid.hash().digest().len(); + + if let Some(single_width_index) = multihash_index.get_mut(&cid_hash_code) { + if let Some(entries) = single_width_index.get_mut(&cid_hash_digest_len) { + entries.push(entry); + } else { + single_width_index.insert(cid_hash_digest_len, vec![entry]); + } + } else { + let mut single_width_index = BTreeMap::new(); + single_width_index.insert(cid_hash_digest_len, vec![entry]); + multihash_index.insert(cid_hash_code, single_width_index); + }; } + let index = Index::multihash( + multihash_index + .into_iter() + .map(|(codec, index)| { + let index_sorted = index + .into_iter() + .map(|(width, entries)| { + SingleWidthIndex::new(width as u32, entries.len() as u64, entries) + }) + .collect::>(); + (codec, IndexSorted(index_sorted)) + }) + .collect(), + ); + self.writer.write_index(&index).await?; + + self.writer.finish().await } } #[cfg(test)] mod tests { - use std::{io::Cursor, str::FromStr}; + use std::{io::Cursor, path::Path}; - use ipld_core::{cid::Cid, codec::Codec}; - use ipld_dagpb::{DagPbCodec, PbNode}; - use sha2::{Digest, Sha256}; use tokio::fs::File; - use crate::{ - multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, - stores::blockstore::Blockstore, - test_utils::assert_buffer_eq, - CarV2Header, CarV2Reader, Index, - }; - - #[tokio::test] - async fn byte_eq_lorem() { - let file = File::open("tests/fixtures/original/lorem.txt") - .await - .unwrap(); - let mut store = Blockstore::new(); - store.read(file).await.unwrap(); - assert_eq!(store.blocks.len(), 1); + use crate::{stores::blockstore::Blockwriter, test_utils::assert_buffer_eq}; - let mut result_buffer = vec![]; - store.write(&mut result_buffer).await.unwrap(); + async fn byte_eq(original: P1, reference: P2) + where + P1: AsRef, + P2: AsRef, + { + let file = File::open(original).await.unwrap(); + let reference = tokio::fs::read(reference).await.unwrap(); - let car_contents = tokio::fs::read("tests/fixtures/car_v2/lorem.car") - .await - .unwrap(); + let mut store = Blockwriter::in_memory(); + store.write_from(file).await.unwrap(); + let output = store.finish().await.unwrap().into_inner(); - assert_buffer_eq!(&result_buffer, &car_contents); + assert_buffer_eq!(&output, &reference); } #[tokio::test] - async fn byte_eq_spaceglenda() { - let file = File::open("tests/fixtures/original/spaceglenda.jpg") - .await - .unwrap(); - let mut store = Blockstore::new(); - store.read(file).await.unwrap(); - assert_eq!(store.blocks.len(), 4); - - let mut result_buffer = vec![]; - store.write(&mut result_buffer).await.unwrap(); - - let car_contents = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") - .await - .unwrap(); - - assert_buffer_eq!(&result_buffer, &car_contents); + async fn new_byte_eq_lorem() { + byte_eq( + "tests/fixtures/original/lorem.txt", + "tests/fixtures/car_v2/lorem.car", + ) + .await; } #[tokio::test] - async fn dedup_lorem() { - let file = File::open("tests/fixtures/original/lorem_4096_dup.txt") - .await - .unwrap(); - let mut store = Blockstore::with_parameters(Some(1024), None); - store.read(file).await.unwrap(); - // We're expecting there to exist a single data block and a root - assert_eq!(store.blocks.len(), 2); + async fn byte_eq_spaceglenda() { + byte_eq( + "tests/fixtures/original/spaceglenda.jpg", + "tests/fixtures/car_v2/spaceglenda.car", + ) + .await; } - // We can't fully validate this test using go-car because they don't offer parametrization #[tokio::test] - async fn dedup_lorem_roundtrip() { - let file = File::open("tests/fixtures/original/lorem_4096_dup.txt") - .await - .unwrap(); - let mut store = Blockstore::with_parameters(Some(1024), None); - store.read(file).await.unwrap(); - // We're expecting there to exist a single data block and a root - assert_eq!(store.blocks.len(), 2); - - let mut result_buffer = vec![]; - store.write(&mut result_buffer).await.unwrap(); - - let mut cursor = Cursor::new(result_buffer); - std::io::Seek::rewind(&mut cursor).unwrap(); - let mut car_reader = CarV2Reader::new(cursor); - - car_reader.read_pragma().await.unwrap(); - - let car_v2_header = car_reader.read_header().await.unwrap(); - assert_eq!(car_v2_header.data_offset, CarV2Header::SIZE); - // Extracted with go-car and validated with an hex viewer - // to extract the values, run the following commands: - // $ car inspect - // The dump is necessary because go-car does not support parametrization - assert_eq!(car_v2_header.data_size, 1358); - assert_eq!(car_v2_header.index_offset, CarV2Header::SIZE + 1358); - - let car_v1_header = car_reader.read_v1_header().await.unwrap(); - assert_eq!(car_v1_header.roots.len(), 1); - - // Extracted with go-car - let root_cid = - Cid::from_str("bafybeiapxsorxw7yqywquebgmlz37nyjt44vxlskhx6wcgirkurojow7xu").unwrap(); - assert_eq!(car_v1_header.roots[0], root_cid); - - let original_1024 = tokio::fs::read("tests/fixtures/original/lorem_1024.txt") + async fn dedup() { + let input = Cursor::new(vec![0u8; 524288]); + let mut store = Blockwriter::in_memory(); + store.write_from(input).await.unwrap(); + let output = store.finish().await.unwrap().into_inner(); + + let reference = tokio::fs::read("tests/fixtures/car_v2/zero.car") .await .unwrap(); - let (cid, data) = car_reader.read_block().await.unwrap(); - assert_buffer_eq!(&data, &original_1024); - let lorem_cid = Cid::new_v1(RAW_CODE, generate_multihash::(&original_1024)); - assert_eq!(cid, lorem_cid); - - let (cid, data) = car_reader.read_block().await.unwrap(); - let node: PbNode = DagPbCodec::decode_from_slice(&data).unwrap(); - assert_eq!(cid, root_cid); - - // There are 4 blocks of repeated 1024 bytes - assert_eq!(node.links.len(), 4); - - for pb_link in node.links { - assert_eq!(pb_link.cid, lorem_cid); - assert_eq!(pb_link.name, Some("".to_string())); - assert_eq!(pb_link.size, Some(1024)); - } - - let index = car_reader.read_index().await.unwrap(); - - match index { - Index::MultihashIndexSorted(index) => { - // There's only Sha256 - assert_eq!(index.len(), 1); - - let index_sorted = &index[&SHA_256_CODE]; - // There's only a single length - assert_eq!(index_sorted.len(), 1); - - let single_width_index = &index_sorted[0]; - assert_eq!(single_width_index.count, 2); - // Sha256 output size (32) + the offset size (8) - assert_eq!(single_width_index.width, Sha256::output_size() as u32 + 8); - assert_eq!(single_width_index.entries.len(), 2); - - // Sorting order is byte-wise, I extracted it manually - assert_eq!(single_width_index.entries[0].offset, 1121); - assert_eq!(single_width_index.entries[1].offset, 59); - assert_eq!( - single_width_index.entries[0].digest, - root_cid.hash().digest() - ); - assert_eq!( - single_width_index.entries[1].digest, - lorem_cid.hash().digest() - ); - } - Index::IndexSorted(_) => panic!("expected index to be MultihashIndexSorted"), - } + assert_buffer_eq!(&output, &reference); } } diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs index b8e494a21..704add767 100644 --- a/mater/lib/src/stores/filestore.rs +++ b/mater/lib/src/stores/filestore.rs @@ -1,8 +1,7 @@ -use bytes::BytesMut; use futures::stream::StreamExt; use ipld_core::cid::Cid; use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; use super::Config; use crate::{ @@ -11,7 +10,7 @@ use crate::{ }; async fn balanced_import( - mut source: Src, + source: Src, mut output: Out, chunk_size: usize, tree_width: usize, @@ -20,49 +19,7 @@ where Src: AsyncRead + Unpin, Out: AsyncWrite + AsyncSeek + Unpin, { - // This custom stream gathers incoming buffers into a single byte chunk of `chunk_size` - // `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt - // to fill it's buffer before returning, voiding the whole promise of properly sized chunks - // There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist: - // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 - let chunker = async_stream::try_stream! { - let mut buf = BytesMut::with_capacity(chunk_size); - - loop { - if buf.capacity() < chunk_size { - // BytesMut::reserve *may* allocate more memory than requested to avoid further - // allocations, while that's very helpful, it's also unpredictable. - // If and when necessary, we can replace this with the following line: - // std::mem::replace(buf, BytesMut::with_capacity(chunk_size)): - - // Reserve only the difference as the split may leave nothing, or something - buf.reserve(chunk_size - buf.capacity()); - } - - // If the read length is 0, we *assume* we reached EOF - // tokio's docs state that this does not mean we exhausted the reader, - // as it may be able to return more bytes later, *however*, - // this means there is no right way of knowing when the reader is fully exhausted! - // If we need to support a case like that, we just need to track how many times - // the reader returned 0 and break at a certain point - if source.read_buf(&mut buf).await? == 0 { - // EOF but there's still content to yield -> yield it - if buf.len() > 0 { - let chunk = buf.split(); - yield chunk.freeze(); - } - break - } else if buf.len() >= chunk_size { - // The buffer may have a larger capacity than chunk_size due to reserve - // this also means that our read may have read more bytes than we expected, - // thats why we check if the length if bigger than the chunk_size and if so - // we split the buffer to the chunk_size, then freeze and return - let chunk = buf.split_to(chunk_size); - yield chunk.freeze(); - } // otherwise, the buffer is not full, so we don't do a thing - } - }; - + let chunker = crate::chunker::byte_stream_chunker(source, chunk_size); let nodes = stream_balanced_tree(chunker, tree_width).peekable(); tokio::pin!(nodes); diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 66920d97d..ffb6d9d24 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -2,7 +2,7 @@ mod blockstore; mod file; mod filestore; -pub use blockstore::Blockstore; +pub use blockstore::Blockwriter; pub use file::FileBlockstore; pub use filestore::create_filestore; diff --git a/mater/lib/src/unixfs/mod.rs b/mater/lib/src/unixfs/mod.rs index 544e0a37c..e4a0ab1b5 100644 --- a/mater/lib/src/unixfs/mod.rs +++ b/mater/lib/src/unixfs/mod.rs @@ -7,12 +7,11 @@ use std::collections::VecDeque; use async_stream::try_stream; use bytes::Bytes; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use ipld_core::{cid::Cid, codec::Codec}; use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; use quick_protobuf::MessageWrite; use sha2::Sha256; -use tokio_stream::{Stream, StreamExt}; use crate::{ multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, @@ -339,8 +338,6 @@ mod tests { //! and there's enough tests around the repo to ensure that if the underlying //! bytes are equal, the expected block sizes are as well. //! - //! We also didn't write our own chunker, relying on [`tokio_util::io::ReadStream`] instead. - //! //! [beetle]: https://github.com/n0-computer/beetle/blob/3e137cb2bc18e1d458c3f72d5e817b03d9537d5d/iroh-unixfs/src/balanced_tree.rs#L234-L507 use bytes::BytesMut; diff --git a/mater/lib/src/v1/mod.rs b/mater/lib/src/v1/mod.rs index 67e442cfa..c949d8716 100644 --- a/mater/lib/src/v1/mod.rs +++ b/mater/lib/src/v1/mod.rs @@ -1,6 +1,7 @@ mod reader; mod writer; +use integer_encoding::VarInt; use ipld_core::cid::{multihash::Multihash, Cid}; use serde::{Deserialize, Serialize}; @@ -38,6 +39,73 @@ impl Header { pub fn new(roots: Vec) -> Self { Self { version: 1, roots } } + + /// The static components of the CBOR encoded [`Header`]. + /// + /// The following is a CBOR encoded [`Header`] with a single CID: + /// ```text + /// A2 # map(2) + /// 65 # text(5) + /// 726F6F7473 # "roots" + /// 81 # array(1) + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// 67 # text(7) + /// 76657273696F6E # "version" + /// 01 # unsigned(1) + /// ``` + /// + /// When calculating the CBOR encoded length, the only thing that changes are the amount of + /// elements inside the array, as such, the static overhead is everything *but*: + /// ```text + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// ``` + const fn cbor_static_overhead() -> usize { + 1 + // map + 1 + // text + 5 + // "roots" + 1 + // array + 1 + // text + 7 + // "version" + 1 // unsigned(1) + } + + /// The length of the CBOR encoded [`Cid`]. + const fn cbor_cid_encoded_len() -> usize { + 2 + // tag(42) + 2 + // bytes + 37 // + } + + /// Returns the encoded length of the header, including the VarInt size prefix. + /// The size of the [`Header`] when encoded using [`DagCborCodec`]. + /// + /// The formula is: `overhead + 41 * roots.len()`. + /// It is based on reversing the CBOR encoding, see an example: + /// ```text + /// A2 # map(2) + /// 65 # text(5) + /// 726F6F7473 # "roots" + /// 81 # array(1) + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// 67 # text(7) + /// 76657273696F6E # "version" + /// 01 # unsigned(1) + /// ``` + /// In this case we're always doing a single root, so we just use the fixed size: 58 + /// + /// Is this cheating? Yes. The alternative is to encode the CARv1 header twice. + /// We can cache it, but for now, this should be better. + pub fn encoded_len(&self) -> usize { + let header_encoded_len = + Self::cbor_static_overhead() + Self::cbor_cid_encoded_len() * self.roots.len(); + header_encoded_len.required_space() + header_encoded_len + } } impl Default for Header { @@ -75,6 +143,14 @@ pub struct BlockMetadata { pub data_size: u64, } +impl BlockMetadata { + /// The length of the encoded block, including the VarInt prefix and CID. + pub fn encoded_len(&self) -> u64 { + let len = self.cid.encoded_len() as u64 + self.data_size; + len.required_space() as u64 + len + } +} + #[cfg(test)] mod tests { use std::io::Cursor; diff --git a/mater/lib/src/v2/index.rs b/mater/lib/src/v2/index.rs index 60ecf6452..33c37456b 100644 --- a/mater/lib/src/v2/index.rs +++ b/mater/lib/src/v2/index.rs @@ -140,7 +140,7 @@ impl TryFrom> for SingleWidthIndex { /// /// For more details, read the [`Format 0x0400: IndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted) section in the CARv2 specification. #[derive(Debug, PartialEq, Eq)] -pub struct IndexSorted(Vec); +pub struct IndexSorted(pub Vec); impl Deref for IndexSorted { type Target = Vec; diff --git a/mater/lib/src/v2/mod.rs b/mater/lib/src/v2/mod.rs index a884e5b32..4696fec0a 100644 --- a/mater/lib/src/v2/mod.rs +++ b/mater/lib/src/v2/mod.rs @@ -64,7 +64,7 @@ impl Default for Characteristics { pub struct Header { /// Describes certain features of the enclosed data. pub characteristics: Characteristics, - /// Byte-offset from the beginning of the CARv2 pragma to the first byte of the CARv1 data payload. + /// Byte-offset from the beginning of the CARv2 pragma to the first byte of the CARv1 payload. pub data_offset: u64, /// Byte-length of the CARv1 data payload. pub data_size: u64, diff --git a/mater/lib/src/v2/writer.rs b/mater/lib/src/v2/writer.rs index e27c5da44..e1df3ac4f 100644 --- a/mater/lib/src/v2/writer.rs +++ b/mater/lib/src/v2/writer.rs @@ -100,16 +100,16 @@ where mod tests { use std::{collections::BTreeMap, io::Cursor}; + use futures::StreamExt; use ipld_core::cid::Cid; use sha2::Sha256; use tokio::{ fs::File, io::{AsyncSeekExt, BufWriter}, }; - use tokio_stream::StreamExt; - use tokio_util::io::ReaderStream; use crate::{ + chunker::byte_stream_chunker, multicodec::{generate_multihash, MultihashCode, RAW_CODE}, test_utils::assert_buffer_eq, unixfs::stream_balanced_tree, @@ -244,11 +244,11 @@ mod tests { .await .unwrap(); // https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 - let file_chunker = ReaderStream::with_capacity(file, 1024 * 256); + let file_chunker = byte_stream_chunker(file, 1024 * 256); let nodes = stream_balanced_tree(file_chunker, 11) - .collect::, _>>() - .await - .unwrap(); + .map(|res| res.unwrap()) + .collect::>() + .await; // To simplify testing, the values were extracted using `car inspect` writer diff --git a/mater/lib/tests/fixtures/car_v2/zero.car b/mater/lib/tests/fixtures/car_v2/zero.car new file mode 100644 index 000000000..3df0cef57 Binary files /dev/null and b/mater/lib/tests/fixtures/car_v2/zero.car differ diff --git a/mater/lib/tests/fixtures/original/lorem_1024.txt b/mater/lib/tests/fixtures/original/lorem_1024.txt deleted file mode 100644 index eb4ec57a2..000000000 --- a/mater/lib/tests/fixtures/original/lorem_1024.txt +++ /dev/null @@ -1,2 +0,0 @@ -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. -Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas \ No newline at end of file diff --git a/mater/lib/tests/fixtures/original/lorem_4096_dup.txt b/mater/lib/tests/fixtures/original/lorem_4096_dup.txt deleted file mode 100644 index cc2bf9e95..000000000 --- a/mater/lib/tests/fixtures/original/lorem_4096_dup.txt +++ /dev/null @@ -1,5 +0,0 @@ -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. -Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. -Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. -Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. -Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas \ No newline at end of file diff --git a/mater/lib/tests/fixtures/original/zero b/mater/lib/tests/fixtures/original/zero new file mode 100644 index 000000000..8dd9b3239 Binary files /dev/null and b/mater/lib/tests/fixtures/original/zero differ