Skip to content

Commit

Permalink
feat(mater): replace blockstore with a more general abstraction (#772)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmg-duarte authored Feb 21, 2025
1 parent 52bea45 commit ad77e05
Show file tree
Hide file tree
Showing 18 changed files with 365 additions and 448 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions mater/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ serde = { workspace = true, features = ["derive"] }
serde_ipld_dagcbor.workspace = true
sha2 = { workspace = true, default-features = true }
thiserror.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["io"] }
tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread", "sync"] }

# Optional dependencies
blockstore = { workspace = true, optional = true }
Expand Down
63 changes: 19 additions & 44 deletions mater/lib/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ use std::{
sync::OnceLock,
};

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use mater::{create_filestore, Blockstore, Config};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use mater::{create_filestore, Blockwriter, Config};
use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng};
use tempfile::{tempdir, TempDir};
use tokio::{fs::File, runtime::Runtime as TokioExecutor};
use tokio::{
fs::File,
io::{AsyncSeek, AsyncWrite},
runtime::Runtime as TokioExecutor,
};

static FILES: OnceLock<Vec<(Params, PathBuf, TempDir)>> = OnceLock::new();
fn get_source_files() -> &'static Vec<(Params, PathBuf, TempDir)> {
Expand Down Expand Up @@ -131,53 +135,25 @@ fn generate_content(params: &Params) -> Vec<u8> {
bytes
}

/// Read content to a Blockstore. This function is benchmarked.
async fn read_content_benched(content: &[u8], mut store: Blockstore) {
/// Read/Write content to a Blockstore. This function is benchmarked.
async fn read_write_content_benched<W>(content: &[u8], mut store: Blockwriter<W>)
where
W: AsyncWrite + AsyncSeek + Unpin,
{
let cursor = Cursor::new(content);
store.read(cursor).await.unwrap()
store.write_from(cursor).await.unwrap();
store.finish().await.unwrap();
}

fn read(c: &mut Criterion) {
fn read_write(c: &mut Criterion) {
let files = get_source_files();

for (params, source_file, _) in files {
let content = std::fs::read(&source_file).unwrap();

c.bench_with_input(BenchmarkId::new("read", params), params, |b, _params| {
b.to_async(TokioExecutor::new().unwrap()).iter(|| {
read_content_benched(
&content,
Blockstore::with_parameters(Some(BLOCK_SIZE), None),
)
});
});
}
}

/// Write content from a Blockstore. This function is benchmarked.
async fn write_contents_benched(buffer: Vec<u8>, store: Blockstore) {
store.write(buffer).await.unwrap();
}

fn write(c: &mut Criterion) {
let runtime = TokioExecutor::new().unwrap();
let files = get_source_files();

for (params, source_file, _) in files {
let mut blockstore = Blockstore::with_parameters(Some(BLOCK_SIZE), None);

// Read file contents to the blockstore
runtime.block_on(async {
let file = File::open(&source_file).await.unwrap();
blockstore.read(file).await.unwrap()
});

c.bench_with_input(BenchmarkId::new("write", params), &(), |b, _: &()| {
b.to_async(TokioExecutor::new().unwrap()).iter_batched(
|| (blockstore.clone(), Vec::with_capacity(params.size)),
|(blockstore, buffer)| write_contents_benched(buffer, blockstore),
BatchSize::SmallInput,
);
b.to_async(TokioExecutor::new().unwrap())
.iter(|| read_write_content_benched(&content, Blockwriter::in_memory()));
});
}
}
Expand Down Expand Up @@ -216,7 +192,6 @@ fn filestore(c: &mut Criterion) {
}
}

criterion_group!(bench_reading, read);
criterion_group!(bench_writing, write);
criterion_group!(bench_reading, read_write);
criterion_group!(bench_filestore, filestore);
criterion_main!(bench_reading, bench_writing, bench_filestore);
criterion_main!(bench_reading, bench_filestore);
61 changes: 61 additions & 0 deletions mater/lib/src/chunker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use bytes::{Bytes, BytesMut};
use futures::Stream;
use tokio::io::{AsyncRead, AsyncReadExt};

pub(crate) fn byte_stream_chunker<S>(
mut source: S,
chunk_size: usize,
) -> impl Stream<Item = std::io::Result<Bytes>>
where
S: AsyncRead + Unpin,
{
// This custom stream gathers incoming buffers into a single byte chunk of `chunk_size`
// `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt
// to fill it's buffer before returning, voiding the whole promise of properly sized chunks
// There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist:
// https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8
async_stream::try_stream! {
let mut buf = BytesMut::with_capacity(chunk_size);

loop {
if buf.capacity() < chunk_size {
// BytesMut::reserve *may* allocate more memory than requested to avoid further
// allocations, while that's very helpful, it's also unpredictable.
// If and when necessary, we can replace this with the following line:
// std::mem::replace(buf, BytesMut::with_capacity(chunk_size)):

// Reserve only the difference as the split may leave nothing, or something
buf.reserve(chunk_size - buf.capacity());
}

// If the read length is 0, we *assume* we reached EOF
// tokio's docs state that this does not mean we exhausted the reader,
// as it may be able to return more bytes later, *however*,
// this means there is no right way of knowing when the reader is fully exhausted!
// If we need to support a case like that, we just need to track how many times
// the reader returned 0 and break at a certain point
if source.read_buf(&mut buf).await? == 0 {
// EOF but there's still content to yield -> yield it
loop {
// Due to the lack of guarantees on the resulting size of `BytesMut::reserve`
// the buffer may contain more than `chunk_size`,
// in that case we must yield the remaning complete chunks first
let chunk = match buf.len() {
0 => break,
len if len <= chunk_size => buf.split(),
_ => buf.split_to(chunk_size),
};
yield chunk.freeze();
}
break
} else if buf.len() >= chunk_size {
// The buffer may have a larger capacity than chunk_size due to reserve
// this also means that our read may have read more bytes than we expected,
// thats why we check if the length if bigger than the chunk_size and if so
// we split the buffer to the chunk_size, then freeze and return
let chunk = buf.split_to(chunk_size);
yield chunk.freeze();
} // otherwise, the buffer is not full, so we don't do a thing
}
}
}
20 changes: 7 additions & 13 deletions mater/lib/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,18 @@ where
mod test {
use std::{io::Cursor, path::Path};

use crate::{Blockstore, CarExtractor};
use crate::CarExtractor;

/// Ensures that duplicated blocks
#[tokio::test]
async fn read_duplicated_blocks() {
let raw_input = std::iter::repeat(0).take(4096).collect::<Vec<u8>>();

let mut bs = Blockstore::with_parameters(Some(1024), None);
bs.read(Cursor::new(raw_input.clone())).await.unwrap();

// 1519 is the expected CAR file size after deduplicating and writing the CAR file
let mut out_car_buffer = Vec::with_capacity(1519);
bs.write(&mut out_car_buffer).await.unwrap();
assert_eq!(out_car_buffer.len(), 1519);
let raw_input = tokio::fs::read("tests/fixtures/original/zero")
.await
.unwrap();

let mut loader = CarExtractor::from_vec(out_car_buffer).await.unwrap();
let mut loader = CarExtractor::from_path("tests/fixtures/car_v2/zero.car")
.await
.unwrap();
let root = loader.roots().await.unwrap()[0];

let mut out_check = Cursor::new(vec![1u8; 4096]);
loader.copy_tree(&root, &mut out_check).await.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#![cfg_attr(not(test), deny(clippy::unwrap_used))]

mod async_varint;
mod chunker;
mod cid;
mod file_reader;
mod multicodec;
Expand All @@ -25,7 +26,7 @@ pub use cid::{CidExt, MultihashExt};
pub use file_reader::CarExtractor;
pub use ipld_core::cid::Cid;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockstore, Config, FileBlockstore};
pub use stores::{create_filestore, Blockwriter, Config, FileBlockstore};
pub use v1::{BlockMetadata, Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted,
Expand Down
Loading

0 comments on commit ad77e05

Please sign in to comment.