From ad77e05b095b041bce62f604bb077297270bb082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= Date: Fri, 21 Feb 2025 15:26:11 +0000 Subject: [PATCH] feat(mater): replace blockstore with a more general abstraction (#772) --- Cargo.lock | 2 - mater/lib/Cargo.toml | 4 +- mater/lib/benches/benchmark.rs | 63 +-- mater/lib/src/chunker.rs | 61 +++ mater/lib/src/file_reader.rs | 20 +- mater/lib/src/lib.rs | 3 +- mater/lib/src/stores/blockstore.rs | 505 +++++++----------- mater/lib/src/stores/filestore.rs | 49 +- mater/lib/src/stores/mod.rs | 2 +- mater/lib/src/unixfs/mod.rs | 5 +- mater/lib/src/v1/mod.rs | 76 +++ mater/lib/src/v2/index.rs | 2 +- mater/lib/src/v2/mod.rs | 2 +- mater/lib/src/v2/writer.rs | 12 +- mater/lib/tests/fixtures/car_v2/zero.car | Bin 0 -> 262549 bytes .../tests/fixtures/original/lorem_1024.txt | 2 - .../fixtures/original/lorem_4096_dup.txt | 5 - mater/lib/tests/fixtures/original/zero | Bin 0 -> 524288 bytes 18 files changed, 365 insertions(+), 448 deletions(-) create mode 100644 mater/lib/src/chunker.rs create mode 100644 mater/lib/tests/fixtures/car_v2/zero.car delete mode 100644 mater/lib/tests/fixtures/original/lorem_1024.txt delete mode 100644 mater/lib/tests/fixtures/original/lorem_4096_dup.txt create mode 100644 mater/lib/tests/fixtures/original/zero diff --git a/Cargo.lock b/Cargo.lock index f86e5f7e1..51602d2a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9190,8 +9190,6 @@ dependencies = [ "tempfile", "thiserror 2.0.8", "tokio", - "tokio-stream", - "tokio-util", ] [[package]] diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 0191772cf..ebd6f11b6 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -26,9 +26,7 @@ serde = { workspace = true, features = ["derive"] } serde_ipld_dagcbor.workspace = true sha2 = { workspace = true, default-features = true } thiserror.workspace = true -tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } -tokio-stream.workspace = true -tokio-util = { workspace = true, features = ["io"] } +tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread", "sync"] } # Optional dependencies blockstore = { workspace = true, optional = true } diff --git a/mater/lib/benches/benchmark.rs b/mater/lib/benches/benchmark.rs index 6b55e4954..2c6c9333f 100644 --- a/mater/lib/benches/benchmark.rs +++ b/mater/lib/benches/benchmark.rs @@ -5,11 +5,15 @@ use std::{ sync::OnceLock, }; -use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; -use mater::{create_filestore, Blockstore, Config}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use mater::{create_filestore, Blockwriter, Config}; use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng}; use tempfile::{tempdir, TempDir}; -use tokio::{fs::File, runtime::Runtime as TokioExecutor}; +use tokio::{ + fs::File, + io::{AsyncSeek, AsyncWrite}, + runtime::Runtime as TokioExecutor, +}; static FILES: OnceLock> = OnceLock::new(); fn get_source_files() -> &'static Vec<(Params, PathBuf, TempDir)> { @@ -131,53 +135,25 @@ fn generate_content(params: &Params) -> Vec { bytes } -/// Read content to a Blockstore. This function is benchmarked. -async fn read_content_benched(content: &[u8], mut store: Blockstore) { +/// Read/Write content to a Blockstore. This function is benchmarked. +async fn read_write_content_benched(content: &[u8], mut store: Blockwriter) +where + W: AsyncWrite + AsyncSeek + Unpin, +{ let cursor = Cursor::new(content); - store.read(cursor).await.unwrap() + store.write_from(cursor).await.unwrap(); + store.finish().await.unwrap(); } -fn read(c: &mut Criterion) { +fn read_write(c: &mut Criterion) { let files = get_source_files(); for (params, source_file, _) in files { let content = std::fs::read(&source_file).unwrap(); c.bench_with_input(BenchmarkId::new("read", params), params, |b, _params| { - b.to_async(TokioExecutor::new().unwrap()).iter(|| { - read_content_benched( - &content, - Blockstore::with_parameters(Some(BLOCK_SIZE), None), - ) - }); - }); - } -} - -/// Write content from a Blockstore. This function is benchmarked. -async fn write_contents_benched(buffer: Vec, store: Blockstore) { - store.write(buffer).await.unwrap(); -} - -fn write(c: &mut Criterion) { - let runtime = TokioExecutor::new().unwrap(); - let files = get_source_files(); - - for (params, source_file, _) in files { - let mut blockstore = Blockstore::with_parameters(Some(BLOCK_SIZE), None); - - // Read file contents to the blockstore - runtime.block_on(async { - let file = File::open(&source_file).await.unwrap(); - blockstore.read(file).await.unwrap() - }); - - c.bench_with_input(BenchmarkId::new("write", params), &(), |b, _: &()| { - b.to_async(TokioExecutor::new().unwrap()).iter_batched( - || (blockstore.clone(), Vec::with_capacity(params.size)), - |(blockstore, buffer)| write_contents_benched(buffer, blockstore), - BatchSize::SmallInput, - ); + b.to_async(TokioExecutor::new().unwrap()) + .iter(|| read_write_content_benched(&content, Blockwriter::in_memory())); }); } } @@ -216,7 +192,6 @@ fn filestore(c: &mut Criterion) { } } -criterion_group!(bench_reading, read); -criterion_group!(bench_writing, write); +criterion_group!(bench_reading, read_write); criterion_group!(bench_filestore, filestore); -criterion_main!(bench_reading, bench_writing, bench_filestore); +criterion_main!(bench_reading, bench_filestore); diff --git a/mater/lib/src/chunker.rs b/mater/lib/src/chunker.rs new file mode 100644 index 000000000..38841ffa6 --- /dev/null +++ b/mater/lib/src/chunker.rs @@ -0,0 +1,61 @@ +use bytes::{Bytes, BytesMut}; +use futures::Stream; +use tokio::io::{AsyncRead, AsyncReadExt}; + +pub(crate) fn byte_stream_chunker( + mut source: S, + chunk_size: usize, +) -> impl Stream> +where + S: AsyncRead + Unpin, +{ + // This custom stream gathers incoming buffers into a single byte chunk of `chunk_size` + // `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt + // to fill it's buffer before returning, voiding the whole promise of properly sized chunks + // There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist: + // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 + async_stream::try_stream! { + let mut buf = BytesMut::with_capacity(chunk_size); + + loop { + if buf.capacity() < chunk_size { + // BytesMut::reserve *may* allocate more memory than requested to avoid further + // allocations, while that's very helpful, it's also unpredictable. + // If and when necessary, we can replace this with the following line: + // std::mem::replace(buf, BytesMut::with_capacity(chunk_size)): + + // Reserve only the difference as the split may leave nothing, or something + buf.reserve(chunk_size - buf.capacity()); + } + + // If the read length is 0, we *assume* we reached EOF + // tokio's docs state that this does not mean we exhausted the reader, + // as it may be able to return more bytes later, *however*, + // this means there is no right way of knowing when the reader is fully exhausted! + // If we need to support a case like that, we just need to track how many times + // the reader returned 0 and break at a certain point + if source.read_buf(&mut buf).await? == 0 { + // EOF but there's still content to yield -> yield it + loop { + // Due to the lack of guarantees on the resulting size of `BytesMut::reserve` + // the buffer may contain more than `chunk_size`, + // in that case we must yield the remaning complete chunks first + let chunk = match buf.len() { + 0 => break, + len if len <= chunk_size => buf.split(), + _ => buf.split_to(chunk_size), + }; + yield chunk.freeze(); + } + break + } else if buf.len() >= chunk_size { + // The buffer may have a larger capacity than chunk_size due to reserve + // this also means that our read may have read more bytes than we expected, + // thats why we check if the length if bigger than the chunk_size and if so + // we split the buffer to the chunk_size, then freeze and return + let chunk = buf.split_to(chunk_size); + yield chunk.freeze(); + } // otherwise, the buffer is not full, so we don't do a thing + } + } +} diff --git a/mater/lib/src/file_reader.rs b/mater/lib/src/file_reader.rs index 8b3d98475..67ab61d84 100644 --- a/mater/lib/src/file_reader.rs +++ b/mater/lib/src/file_reader.rs @@ -174,24 +174,18 @@ where mod test { use std::{io::Cursor, path::Path}; - use crate::{Blockstore, CarExtractor}; + use crate::CarExtractor; - /// Ensures that duplicated blocks #[tokio::test] async fn read_duplicated_blocks() { - let raw_input = std::iter::repeat(0).take(4096).collect::>(); - - let mut bs = Blockstore::with_parameters(Some(1024), None); - bs.read(Cursor::new(raw_input.clone())).await.unwrap(); - - // 1519 is the expected CAR file size after deduplicating and writing the CAR file - let mut out_car_buffer = Vec::with_capacity(1519); - bs.write(&mut out_car_buffer).await.unwrap(); - assert_eq!(out_car_buffer.len(), 1519); + let raw_input = tokio::fs::read("tests/fixtures/original/zero") + .await + .unwrap(); - let mut loader = CarExtractor::from_vec(out_car_buffer).await.unwrap(); + let mut loader = CarExtractor::from_path("tests/fixtures/car_v2/zero.car") + .await + .unwrap(); let root = loader.roots().await.unwrap()[0]; - let mut out_check = Cursor::new(vec![1u8; 4096]); loader.copy_tree(&root, &mut out_check).await.unwrap(); diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 2b8432589..c06888a69 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -12,6 +12,7 @@ #![cfg_attr(not(test), deny(clippy::unwrap_used))] mod async_varint; +mod chunker; mod cid; mod file_reader; mod multicodec; @@ -25,7 +26,7 @@ pub use cid::{CidExt, MultihashExt}; pub use file_reader::CarExtractor; pub use ipld_core::cid::Cid; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; +pub use stores::{create_filestore, Blockwriter, Config, FileBlockstore}; pub use v1::{BlockMetadata, Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 3b6bb66f3..2a9c0cb73 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -1,370 +1,237 @@ -// NOTE(@jmg-duarte,28/05/2024): the blockstore can (and should) evolve to support other backends. -// At the time of writing, there is no need invest more time in it because the current PR(#25) is delayed enough. - -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{BTreeMap, HashMap}, + io::Cursor, +}; -use bytes::Bytes; -use indexmap::IndexMap; -use integer_encoding::VarInt; +use futures::stream::StreamExt; use ipld_core::cid::Cid; -use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_stream::StreamExt; -use tokio_util::io::ReaderStream; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{ - multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, - Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, + unixfs::stream_balanced_tree, + v1::{self}, + v2, BlockMetadata, Error, Index, IndexEntry, IndexSorted, SingleWidthIndex, }; -/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory. -/// -/// The store will chunk data blocks into `chunk_size` and "gather" nodes in groups with at most `tree_width` children. -/// You can visualize the underlying tree in , using the "Balanced DAG" layout. -/// -/// It is necessary to keep the blocks somewhere before writing them to a file since the CARv2 header -/// has data size, index offset and indexes fields, all these requiring information that only becomes -/// "available" after you process all the blocks. -/// -/// The store keeps track of ([`Cid`], [`Bytes`]) pairs, performing de-duplication based on the [`Cid`]. -/// -/// **Important note: currently, the blockstore only supports a single file!** -#[derive(Debug, Clone)] -pub struct Blockstore { - root: Option, - blocks: IndexMap, - indexed: HashSet, - - chunk_size: usize, - tree_width: usize, +/// CAR file writer. +pub struct Blockwriter { + writer: v2::Writer, + index: HashMap, + roots: Vec, + started: bool, } -impl Blockstore { - /// The size of the [`Header`] when encoded using [`DagCborCodec`]. - /// - /// The formula is: `overhead + 37 * roots.len()`. - /// It is based on reversing the CBOR encoding, see an example: - /// ```text - /// A2 # map(2) - /// 65 # text(5) - /// 726F6F7473 # "roots" - /// 81 # array(1) - /// D8 2A # tag(42) - /// 58 25 # bytes(37) - /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B - /// 67 # text(7) - /// 76657273696F6E # "version" - /// 01 # unsigned(1) - /// ``` - /// In this case we're always doing a single root, so we just use the fixed size: 58 - /// - /// Is this cheating? Yes. The alternative is to encode the CARv1 header twice. - /// We can cache it, but for now, this should be better. - const V1_HEADER_OVERHEAD: u64 = 58; - - /// Construct a new [`Blockstore`], using the default parameters. - pub fn new() -> Self { - Default::default() +impl Blockwriter { + /// Creates a new [`Blockwriter`] with the given `writer`. + pub fn new(writer: W) -> Self { + Self { + writer: v2::Writer::new(writer), + index: HashMap::new(), + roots: Vec::with_capacity(1), + started: false, + } + } + + /// Creates a new [`v1::Header`]. + fn header_v1(&self) -> v1::Header { + // Lack of partial moves make this clone "required" + v1::Header::new(self.roots.clone()) + } + + /// Creates a new [`v2::Header`]. + fn header_v2(&self, v1_header: &v1::Header) -> v2::Header { + let total_block_encoded_len = self + .index + .values() + .map(BlockMetadata::encoded_len) + .sum::(); + + let v1_payload_len = v1_header.encoded_len() as u64 + total_block_encoded_len; + + v2::Header::new( + false, + v2::Header::SIZE, + v1_payload_len, + v2::Header::SIZE + v1_payload_len, + ) } +} - /// Construct a new [`Blockstore`], using custom parameters. - /// If set to `None`, the corresponding default value will be used. - pub fn with_parameters(chunk_size: Option, tree_width: Option) -> Self { - // NOTE(@jmg-duarte,28/05/2024): once the time comes, this method should probably be replaced with a builder +impl Blockwriter>> { + /// Creates an in-memory [`Blockwriter`]. + pub fn in_memory() -> Self { Self { - root: None, - blocks: IndexMap::new(), - indexed: HashSet::new(), - chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE), - tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH), + writer: v2::Writer::new(Cursor::new(vec![])), + index: HashMap::new(), + roots: Vec::with_capacity(1), + started: false, } } +} - /// Fully read the contents of an arbitrary `reader` into the [`Blockstore`], - /// converting the contents into a CARv2 file. - pub async fn read(&mut self, reader: R) -> Result<(), Error> +impl Blockwriter +where + W: AsyncWrite + AsyncSeek + Unpin, +{ + /// Writes the contents from `source`, adding a new root to [`Blockwriter`]. + pub async fn write_from(&mut self, source: S) -> Result<(), Error> where - R: AsyncRead + Unpin, + S: AsyncRead + Unpin, { - let chunks = ReaderStream::with_capacity(reader, self.chunk_size); - - // The `stream -> pin -> peekable` combo instead of `stream -> peekable -> pin` feels weird - // but it has to do with two things: - // - The fact that the stream can be self-referential: - // https://users.rust-lang.org/t/why-is-pin-mut-needed-for-iteration-of-async-stream/51107 - // - Using a tokio_stream::Peekable instead of futures::Peekable, they differ on who is required to be pinned - // - tokio_stream::Peekable::peek(&mut self) - // https://github.com/tokio-rs/tokio/blob/14c17fc09656a30230177b600bacceb9db33e942/tokio-stream/src/stream_ext/peekable.rs#L26-L37 - // - futures::Peekable::peek(self: Pin<&mut Self>) - // https://github.com/rust-lang/futures-rs/blob/c507ff833728e2979cf5519fc931ea97308ec876/futures-util/src/stream/stream/peek.rs#L38-L40 - let tree = stream_balanced_tree(chunks, self.tree_width); - tokio::pin!(tree); - let mut tree = tree.peekable(); - - while let Some(block) = tree.next().await { - let (cid, bytes) = block?; - self.insert(cid, bytes, true); - - // If the stream is exhausted, we know the current block is the root - if tree.peek().await.is_none() { - // The root should always be indexed, there's no official spec saying it should though, it just makes sense. - // So, if the insert line is changed, the root should be placed in the `indexed` structure here - self.root = Some(cid); - } + if !self.started { + self.writer.write_header(&Default::default()).await?; + self.writer.write_v1_header(&Default::default()).await?; + self.started = true; } - Ok(()) - } + let mut current_position = self.writer.get_inner_mut().stream_position().await?; + + let chunker = crate::chunker::byte_stream_chunker(source, DEFAULT_BLOCK_SIZE); + let nodes = stream_balanced_tree(chunker, DEFAULT_TREE_WIDTH).peekable(); + tokio::pin!(nodes); + + let mut root = None; + while let Some(node) = nodes.next().await { + let (node_cid, node_bytes) = node?; + let block_offset = current_position; + + if !self.index.contains_key(&node_cid) { + let written = self.writer.write_block(&node_cid, &node_bytes).await? as u64; + current_position += written; + self.index.insert( + node_cid, + BlockMetadata { + block_offset, + cid: node_cid, + data_offset_source: block_offset + written, + data_size: node_bytes.len() as u64, + }, + ); + } - /// Write the contents of the [`Blockstore`] as CARv2 to a writer. - pub async fn write(mut self, writer: W) -> Result - where - W: AsyncWrite + Unpin, - { - let mut position = 0; - - let mut writer = CarV2Writer::new(writer); - let header_v2 = self.header_v2(); - - // Writing the CARv1 starts where the CARv2 header ends - // this value is required for indexing, - // whose offset starts at the beginning of the CARv1 header - let car_v1_start = writer.write_header(&header_v2).await?; - position += car_v1_start; - - // CARv1 files are REQUIRED to have a root - let header_v1 = self - .root - .map(|root| CarV1Header::new(vec![root])) - .ok_or(Error::EmptyRootsError)?; - position += writer.write_v1_header(&header_v1).await?; - - let mut offsets = HashMap::new(); - for (cid, block) in self.blocks.drain(..) { - if self.indexed.contains(&cid) { - offsets.insert(cid, position - car_v1_start); + if nodes.as_mut().peek().await.is_none() { + root = Some(node_cid); } - position += writer.write_block(&cid, &block).await?; } - let count = offsets.len() as u64; - let entries = offsets - .into_iter() - .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64)) - .collect(); - let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( - SHA_256_CODE, - SingleWidthIndex::new(Sha256::output_size() as u32, count, entries).into(), - )); - position += writer.write_index(&index).await?; - - Ok(position) - } - - /// Get the [`CarV2Header`] that will be written out. - fn header_v2(&self) -> CarV2Header { - let data_offset = CarV2Header::SIZE; - let data_size: u64 = self - .blocks - .iter() - .map(|(cid, bytes)| { - let size = (cid.encoded_len() + bytes.len()) as u64; - let varint_size = size.required_space() as u64; - size + varint_size - }) - .sum(); - - let header_v1_varint = Self::V1_HEADER_OVERHEAD.required_space() as u64; - let car_v1_payload_length = Self::V1_HEADER_OVERHEAD + header_v1_varint + data_size; - - // If there is padding, this does not apply, however, the go-car tool doesn't seem to ever add padding - let index_offset = data_offset + car_v1_payload_length; - - // NOTE(@jmg-duarte,28/05/2024): the `fully_indexed` field is currently set to `false` as the - // go-car tool doesn't seem to ever set it, however, according to the written definition we have from the spec - // we're performing full indexing, as all blocks are inserted with `index: true`. - CarV2Header::new(false, data_offset, car_v1_payload_length, index_offset) - } - - /// Insert a new block into the [`Blockstore`]. - /// - /// If the [`Cid`] has been previously inserted, this function is a no-op. - fn insert(&mut self, cid: Cid, data: Bytes, index: bool) { - if !self.blocks.contains_key(&cid) { - self.blocks.insert_full(cid, data); - if index { - self.indexed.insert(cid); - } + match root { + Some(root) => self.roots.push(root), + None => return Err(Error::EmptyRootsError), } + + Ok(()) } -} -impl Default for Blockstore { - fn default() -> Self { - Self { - root: None, - blocks: IndexMap::new(), - indexed: HashSet::new(), - chunk_size: DEFAULT_BLOCK_SIZE, - tree_width: DEFAULT_TREE_WIDTH, + /// Writes the final header as well as the indexes, flushes the inner writer and returns it. + pub async fn finish(mut self) -> Result { + self.writer.get_inner_mut().rewind().await?; + + let v1_header = self.header_v1(); + let v2_header = self.header_v2(&v1_header); + + self.writer.write_header(&v2_header).await?; + self.writer.write_v1_header(&v1_header).await?; + + self.writer + .get_inner_mut() + .seek(std::io::SeekFrom::Start(v2_header.index_offset)) + .await?; + + // Abstracting away the index writing is not that simple because we have extra bookkeeping + let mut multihash_index: BTreeMap>> = BTreeMap::new(); + for (cid, metadata) in self.index { + let entry = IndexEntry::new( + metadata.cid.hash().digest().to_vec(), + metadata.block_offset - v2_header.data_offset, + ); + + let cid_hash_code = cid.hash().code(); + let cid_hash_digest_len = cid.hash().digest().len(); + + if let Some(single_width_index) = multihash_index.get_mut(&cid_hash_code) { + if let Some(entries) = single_width_index.get_mut(&cid_hash_digest_len) { + entries.push(entry); + } else { + single_width_index.insert(cid_hash_digest_len, vec![entry]); + } + } else { + let mut single_width_index = BTreeMap::new(); + single_width_index.insert(cid_hash_digest_len, vec![entry]); + multihash_index.insert(cid_hash_code, single_width_index); + }; } + let index = Index::multihash( + multihash_index + .into_iter() + .map(|(codec, index)| { + let index_sorted = index + .into_iter() + .map(|(width, entries)| { + SingleWidthIndex::new(width as u32, entries.len() as u64, entries) + }) + .collect::>(); + (codec, IndexSorted(index_sorted)) + }) + .collect(), + ); + self.writer.write_index(&index).await?; + + self.writer.finish().await } } #[cfg(test)] mod tests { - use std::{io::Cursor, str::FromStr}; + use std::{io::Cursor, path::Path}; - use ipld_core::{cid::Cid, codec::Codec}; - use ipld_dagpb::{DagPbCodec, PbNode}; - use sha2::{Digest, Sha256}; use tokio::fs::File; - use crate::{ - multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, - stores::blockstore::Blockstore, - test_utils::assert_buffer_eq, - CarV2Header, CarV2Reader, Index, - }; - - #[tokio::test] - async fn byte_eq_lorem() { - let file = File::open("tests/fixtures/original/lorem.txt") - .await - .unwrap(); - let mut store = Blockstore::new(); - store.read(file).await.unwrap(); - assert_eq!(store.blocks.len(), 1); + use crate::{stores::blockstore::Blockwriter, test_utils::assert_buffer_eq}; - let mut result_buffer = vec![]; - store.write(&mut result_buffer).await.unwrap(); + async fn byte_eq(original: P1, reference: P2) + where + P1: AsRef, + P2: AsRef, + { + let file = File::open(original).await.unwrap(); + let reference = tokio::fs::read(reference).await.unwrap(); - let car_contents = tokio::fs::read("tests/fixtures/car_v2/lorem.car") - .await - .unwrap(); + let mut store = Blockwriter::in_memory(); + store.write_from(file).await.unwrap(); + let output = store.finish().await.unwrap().into_inner(); - assert_buffer_eq!(&result_buffer, &car_contents); + assert_buffer_eq!(&output, &reference); } #[tokio::test] - async fn byte_eq_spaceglenda() { - let file = File::open("tests/fixtures/original/spaceglenda.jpg") - .await - .unwrap(); - let mut store = Blockstore::new(); - store.read(file).await.unwrap(); - assert_eq!(store.blocks.len(), 4); - - let mut result_buffer = vec![]; - store.write(&mut result_buffer).await.unwrap(); - - let car_contents = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") - .await - .unwrap(); - - assert_buffer_eq!(&result_buffer, &car_contents); + async fn new_byte_eq_lorem() { + byte_eq( + "tests/fixtures/original/lorem.txt", + "tests/fixtures/car_v2/lorem.car", + ) + .await; } #[tokio::test] - async fn dedup_lorem() { - let file = File::open("tests/fixtures/original/lorem_4096_dup.txt") - .await - .unwrap(); - let mut store = Blockstore::with_parameters(Some(1024), None); - store.read(file).await.unwrap(); - // We're expecting there to exist a single data block and a root - assert_eq!(store.blocks.len(), 2); + async fn byte_eq_spaceglenda() { + byte_eq( + "tests/fixtures/original/spaceglenda.jpg", + "tests/fixtures/car_v2/spaceglenda.car", + ) + .await; } - // We can't fully validate this test using go-car because they don't offer parametrization #[tokio::test] - async fn dedup_lorem_roundtrip() { - let file = File::open("tests/fixtures/original/lorem_4096_dup.txt") - .await - .unwrap(); - let mut store = Blockstore::with_parameters(Some(1024), None); - store.read(file).await.unwrap(); - // We're expecting there to exist a single data block and a root - assert_eq!(store.blocks.len(), 2); - - let mut result_buffer = vec![]; - store.write(&mut result_buffer).await.unwrap(); - - let mut cursor = Cursor::new(result_buffer); - std::io::Seek::rewind(&mut cursor).unwrap(); - let mut car_reader = CarV2Reader::new(cursor); - - car_reader.read_pragma().await.unwrap(); - - let car_v2_header = car_reader.read_header().await.unwrap(); - assert_eq!(car_v2_header.data_offset, CarV2Header::SIZE); - // Extracted with go-car and validated with an hex viewer - // to extract the values, run the following commands: - // $ car inspect - // The dump is necessary because go-car does not support parametrization - assert_eq!(car_v2_header.data_size, 1358); - assert_eq!(car_v2_header.index_offset, CarV2Header::SIZE + 1358); - - let car_v1_header = car_reader.read_v1_header().await.unwrap(); - assert_eq!(car_v1_header.roots.len(), 1); - - // Extracted with go-car - let root_cid = - Cid::from_str("bafybeiapxsorxw7yqywquebgmlz37nyjt44vxlskhx6wcgirkurojow7xu").unwrap(); - assert_eq!(car_v1_header.roots[0], root_cid); - - let original_1024 = tokio::fs::read("tests/fixtures/original/lorem_1024.txt") + async fn dedup() { + let input = Cursor::new(vec![0u8; 524288]); + let mut store = Blockwriter::in_memory(); + store.write_from(input).await.unwrap(); + let output = store.finish().await.unwrap().into_inner(); + + let reference = tokio::fs::read("tests/fixtures/car_v2/zero.car") .await .unwrap(); - let (cid, data) = car_reader.read_block().await.unwrap(); - assert_buffer_eq!(&data, &original_1024); - let lorem_cid = Cid::new_v1(RAW_CODE, generate_multihash::(&original_1024)); - assert_eq!(cid, lorem_cid); - - let (cid, data) = car_reader.read_block().await.unwrap(); - let node: PbNode = DagPbCodec::decode_from_slice(&data).unwrap(); - assert_eq!(cid, root_cid); - - // There are 4 blocks of repeated 1024 bytes - assert_eq!(node.links.len(), 4); - - for pb_link in node.links { - assert_eq!(pb_link.cid, lorem_cid); - assert_eq!(pb_link.name, Some("".to_string())); - assert_eq!(pb_link.size, Some(1024)); - } - - let index = car_reader.read_index().await.unwrap(); - - match index { - Index::MultihashIndexSorted(index) => { - // There's only Sha256 - assert_eq!(index.len(), 1); - - let index_sorted = &index[&SHA_256_CODE]; - // There's only a single length - assert_eq!(index_sorted.len(), 1); - - let single_width_index = &index_sorted[0]; - assert_eq!(single_width_index.count, 2); - // Sha256 output size (32) + the offset size (8) - assert_eq!(single_width_index.width, Sha256::output_size() as u32 + 8); - assert_eq!(single_width_index.entries.len(), 2); - - // Sorting order is byte-wise, I extracted it manually - assert_eq!(single_width_index.entries[0].offset, 1121); - assert_eq!(single_width_index.entries[1].offset, 59); - assert_eq!( - single_width_index.entries[0].digest, - root_cid.hash().digest() - ); - assert_eq!( - single_width_index.entries[1].digest, - lorem_cid.hash().digest() - ); - } - Index::IndexSorted(_) => panic!("expected index to be MultihashIndexSorted"), - } + assert_buffer_eq!(&output, &reference); } } diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs index b8e494a21..704add767 100644 --- a/mater/lib/src/stores/filestore.rs +++ b/mater/lib/src/stores/filestore.rs @@ -1,8 +1,7 @@ -use bytes::BytesMut; use futures::stream::StreamExt; use ipld_core::cid::Cid; use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; use super::Config; use crate::{ @@ -11,7 +10,7 @@ use crate::{ }; async fn balanced_import( - mut source: Src, + source: Src, mut output: Out, chunk_size: usize, tree_width: usize, @@ -20,49 +19,7 @@ where Src: AsyncRead + Unpin, Out: AsyncWrite + AsyncSeek + Unpin, { - // This custom stream gathers incoming buffers into a single byte chunk of `chunk_size` - // `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt - // to fill it's buffer before returning, voiding the whole promise of properly sized chunks - // There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist: - // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 - let chunker = async_stream::try_stream! { - let mut buf = BytesMut::with_capacity(chunk_size); - - loop { - if buf.capacity() < chunk_size { - // BytesMut::reserve *may* allocate more memory than requested to avoid further - // allocations, while that's very helpful, it's also unpredictable. - // If and when necessary, we can replace this with the following line: - // std::mem::replace(buf, BytesMut::with_capacity(chunk_size)): - - // Reserve only the difference as the split may leave nothing, or something - buf.reserve(chunk_size - buf.capacity()); - } - - // If the read length is 0, we *assume* we reached EOF - // tokio's docs state that this does not mean we exhausted the reader, - // as it may be able to return more bytes later, *however*, - // this means there is no right way of knowing when the reader is fully exhausted! - // If we need to support a case like that, we just need to track how many times - // the reader returned 0 and break at a certain point - if source.read_buf(&mut buf).await? == 0 { - // EOF but there's still content to yield -> yield it - if buf.len() > 0 { - let chunk = buf.split(); - yield chunk.freeze(); - } - break - } else if buf.len() >= chunk_size { - // The buffer may have a larger capacity than chunk_size due to reserve - // this also means that our read may have read more bytes than we expected, - // thats why we check if the length if bigger than the chunk_size and if so - // we split the buffer to the chunk_size, then freeze and return - let chunk = buf.split_to(chunk_size); - yield chunk.freeze(); - } // otherwise, the buffer is not full, so we don't do a thing - } - }; - + let chunker = crate::chunker::byte_stream_chunker(source, chunk_size); let nodes = stream_balanced_tree(chunker, tree_width).peekable(); tokio::pin!(nodes); diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 66920d97d..ffb6d9d24 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -2,7 +2,7 @@ mod blockstore; mod file; mod filestore; -pub use blockstore::Blockstore; +pub use blockstore::Blockwriter; pub use file::FileBlockstore; pub use filestore::create_filestore; diff --git a/mater/lib/src/unixfs/mod.rs b/mater/lib/src/unixfs/mod.rs index 544e0a37c..e4a0ab1b5 100644 --- a/mater/lib/src/unixfs/mod.rs +++ b/mater/lib/src/unixfs/mod.rs @@ -7,12 +7,11 @@ use std::collections::VecDeque; use async_stream::try_stream; use bytes::Bytes; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use ipld_core::{cid::Cid, codec::Codec}; use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; use quick_protobuf::MessageWrite; use sha2::Sha256; -use tokio_stream::{Stream, StreamExt}; use crate::{ multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, @@ -339,8 +338,6 @@ mod tests { //! and there's enough tests around the repo to ensure that if the underlying //! bytes are equal, the expected block sizes are as well. //! - //! We also didn't write our own chunker, relying on [`tokio_util::io::ReadStream`] instead. - //! //! [beetle]: https://github.com/n0-computer/beetle/blob/3e137cb2bc18e1d458c3f72d5e817b03d9537d5d/iroh-unixfs/src/balanced_tree.rs#L234-L507 use bytes::BytesMut; diff --git a/mater/lib/src/v1/mod.rs b/mater/lib/src/v1/mod.rs index 67e442cfa..c949d8716 100644 --- a/mater/lib/src/v1/mod.rs +++ b/mater/lib/src/v1/mod.rs @@ -1,6 +1,7 @@ mod reader; mod writer; +use integer_encoding::VarInt; use ipld_core::cid::{multihash::Multihash, Cid}; use serde::{Deserialize, Serialize}; @@ -38,6 +39,73 @@ impl Header { pub fn new(roots: Vec) -> Self { Self { version: 1, roots } } + + /// The static components of the CBOR encoded [`Header`]. + /// + /// The following is a CBOR encoded [`Header`] with a single CID: + /// ```text + /// A2 # map(2) + /// 65 # text(5) + /// 726F6F7473 # "roots" + /// 81 # array(1) + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// 67 # text(7) + /// 76657273696F6E # "version" + /// 01 # unsigned(1) + /// ``` + /// + /// When calculating the CBOR encoded length, the only thing that changes are the amount of + /// elements inside the array, as such, the static overhead is everything *but*: + /// ```text + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// ``` + const fn cbor_static_overhead() -> usize { + 1 + // map + 1 + // text + 5 + // "roots" + 1 + // array + 1 + // text + 7 + // "version" + 1 // unsigned(1) + } + + /// The length of the CBOR encoded [`Cid`]. + const fn cbor_cid_encoded_len() -> usize { + 2 + // tag(42) + 2 + // bytes + 37 // + } + + /// Returns the encoded length of the header, including the VarInt size prefix. + /// The size of the [`Header`] when encoded using [`DagCborCodec`]. + /// + /// The formula is: `overhead + 41 * roots.len()`. + /// It is based on reversing the CBOR encoding, see an example: + /// ```text + /// A2 # map(2) + /// 65 # text(5) + /// 726F6F7473 # "roots" + /// 81 # array(1) + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// 67 # text(7) + /// 76657273696F6E # "version" + /// 01 # unsigned(1) + /// ``` + /// In this case we're always doing a single root, so we just use the fixed size: 58 + /// + /// Is this cheating? Yes. The alternative is to encode the CARv1 header twice. + /// We can cache it, but for now, this should be better. + pub fn encoded_len(&self) -> usize { + let header_encoded_len = + Self::cbor_static_overhead() + Self::cbor_cid_encoded_len() * self.roots.len(); + header_encoded_len.required_space() + header_encoded_len + } } impl Default for Header { @@ -75,6 +143,14 @@ pub struct BlockMetadata { pub data_size: u64, } +impl BlockMetadata { + /// The length of the encoded block, including the VarInt prefix and CID. + pub fn encoded_len(&self) -> u64 { + let len = self.cid.encoded_len() as u64 + self.data_size; + len.required_space() as u64 + len + } +} + #[cfg(test)] mod tests { use std::io::Cursor; diff --git a/mater/lib/src/v2/index.rs b/mater/lib/src/v2/index.rs index 60ecf6452..33c37456b 100644 --- a/mater/lib/src/v2/index.rs +++ b/mater/lib/src/v2/index.rs @@ -140,7 +140,7 @@ impl TryFrom> for SingleWidthIndex { /// /// For more details, read the [`Format 0x0400: IndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted) section in the CARv2 specification. #[derive(Debug, PartialEq, Eq)] -pub struct IndexSorted(Vec); +pub struct IndexSorted(pub Vec); impl Deref for IndexSorted { type Target = Vec; diff --git a/mater/lib/src/v2/mod.rs b/mater/lib/src/v2/mod.rs index a884e5b32..4696fec0a 100644 --- a/mater/lib/src/v2/mod.rs +++ b/mater/lib/src/v2/mod.rs @@ -64,7 +64,7 @@ impl Default for Characteristics { pub struct Header { /// Describes certain features of the enclosed data. pub characteristics: Characteristics, - /// Byte-offset from the beginning of the CARv2 pragma to the first byte of the CARv1 data payload. + /// Byte-offset from the beginning of the CARv2 pragma to the first byte of the CARv1 payload. pub data_offset: u64, /// Byte-length of the CARv1 data payload. pub data_size: u64, diff --git a/mater/lib/src/v2/writer.rs b/mater/lib/src/v2/writer.rs index e27c5da44..e1df3ac4f 100644 --- a/mater/lib/src/v2/writer.rs +++ b/mater/lib/src/v2/writer.rs @@ -100,16 +100,16 @@ where mod tests { use std::{collections::BTreeMap, io::Cursor}; + use futures::StreamExt; use ipld_core::cid::Cid; use sha2::Sha256; use tokio::{ fs::File, io::{AsyncSeekExt, BufWriter}, }; - use tokio_stream::StreamExt; - use tokio_util::io::ReaderStream; use crate::{ + chunker::byte_stream_chunker, multicodec::{generate_multihash, MultihashCode, RAW_CODE}, test_utils::assert_buffer_eq, unixfs::stream_balanced_tree, @@ -244,11 +244,11 @@ mod tests { .await .unwrap(); // https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 - let file_chunker = ReaderStream::with_capacity(file, 1024 * 256); + let file_chunker = byte_stream_chunker(file, 1024 * 256); let nodes = stream_balanced_tree(file_chunker, 11) - .collect::, _>>() - .await - .unwrap(); + .map(|res| res.unwrap()) + .collect::>() + .await; // To simplify testing, the values were extracted using `car inspect` writer diff --git a/mater/lib/tests/fixtures/car_v2/zero.car b/mater/lib/tests/fixtures/car_v2/zero.car new file mode 100644 index 0000000000000000000000000000000000000000..3df0cef57016e5d60309045f56cb22b58f6360c6 GIT binary patch literal 262549 zcmeIvJxgOj6adio)PM$DBimqQ}sGA^wJ!E=zA+1{JSDN zEe(VVQM{k7pWhxHUCd23Q^mdG)^qpOOl4|!HS<~xvX%MrmqC;S2oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N z0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ z009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7csfuJ0Q8&TX!TrSohYumNy zo3s90Bi*=Lf2v+5hF+TE4}EWigJ>+73KybyKV3h+Jv_RYn{1|vd&jNk?yH%~)b48L zwHjnA^W`WQER_cSuRHmpx2t_Bj@u_cVy8E3|4Y>Qn~!|79hpx3-2}fnec!!kDd_p^ FdYwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK zfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5 zV8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM z7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b* z1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd z0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwA zz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEj zFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r z3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@ z0|pEjFkrxd0RsjM7%*VKfB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM7%*VK VfB^#r3>YwAz<>b*1`HVZ0|Nkf00961 literal 0 HcmV?d00001