Skip to content

Commit

Permalink
feat(mater): remove filestore abstraction in favor of blockwriter (#773)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmg-duarte authored Feb 24, 2025
1 parent 53fa203 commit 0992f42
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 176 deletions.
6 changes: 3 additions & 3 deletions mater/cli/src/convert.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::PathBuf;

use mater::{create_filestore, Cid, Config, Error};
use mater::{Blockwriter, Cid, Error};
use tokio::fs::File;

/// Converts a file at location `input_path` to a CARv2 file at `output_path`
Expand All @@ -16,10 +16,10 @@ pub(crate) async fn convert_file_to_car(
}?;

if input_path.as_os_str() == "-" {
create_filestore(tokio::io::stdin(), output_file, Config::default()).await
Blockwriter::import(tokio::io::stdin(), output_file).await
} else {
let source_file = File::open(input_path).await?;
create_filestore(source_file, output_file, Config::default()).await
Blockwriter::import(source_file, output_file).await
}
}

Expand Down
18 changes: 8 additions & 10 deletions mater/lib/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use mater::{create_filestore, Blockwriter, Config};
use mater::Blockwriter;
use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng};
use tempfile::{tempdir, TempDir};
use tokio::{
Expand Down Expand Up @@ -169,29 +169,27 @@ fn prepare_source_file(content: &[u8]) -> (TempDir, PathBuf) {
(temp_dir, file)
}

/// Create a filestore. This function is benchmarked.
async fn create_filestore_benched(source: &Path, target: &Path) {
/// Import a source into a writer. This function is benchmarked.
async fn blockwriter_import(source: &Path, target: &Path) {
let source_file = File::open(source).await.unwrap();
let output_file = File::create(target).await.unwrap();

create_filestore(source_file, output_file, Config::default())
.await
.unwrap();
Blockwriter::import(source_file, output_file).await.unwrap();
}

fn filestore(c: &mut Criterion) {
fn import(c: &mut Criterion) {
let files = get_source_files();

for (params, source_file, temp_dir) in files {
let target_file = temp_dir.path().join("target");

c.bench_with_input(BenchmarkId::new("filestore", params), &(), |b, _: &()| {
b.to_async(TokioExecutor::new().unwrap())
.iter(|| create_filestore_benched(&source_file, &target_file));
.iter(|| blockwriter_import(&source_file, &target_file));
});
}
}

criterion_group!(bench_reading, read_write);
criterion_group!(bench_filestore, filestore);
criterion_main!(bench_reading, bench_filestore);
criterion_group!(bench_import, import);
criterion_main!(bench_reading, bench_import);
10 changes: 5 additions & 5 deletions mater/lib/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ mod test {

#[tokio::test]
async fn read_duplicated_blocks() {
let raw_input = tokio::fs::read("tests/fixtures/original/zero")
.await
.unwrap();

let mut loader = CarExtractor::from_path("tests/fixtures/car_v2/zero.car")
.await
.unwrap();
let root = loader.roots().await.unwrap()[0];
let mut out_check = Cursor::new(vec![1u8; 4096]);
loader.copy_tree(&root, &mut out_check).await.unwrap();

assert_eq!(raw_input, out_check.into_inner());
let expected = [0u8; 524288].as_slice();
let inner = out_check.into_inner();
let result = inner.as_slice();

assert_eq!(expected, result);
}

async fn load_and_compare<P1, P2>(original: P1, path: P2)
Expand Down
2 changes: 1 addition & 1 deletion mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use cid::{CidExt, MultihashExt};
pub use file_reader::CarExtractor;
pub use ipld_core::cid::Cid;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockwriter, Config, FileBlockstore};
pub use stores::{Blockwriter, Config, FileBlockstore};
pub use v1::{BlockMetadata, Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted,
Expand Down
30 changes: 26 additions & 4 deletions mater/lib/src/stores/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use futures::stream::StreamExt;
use ipld_core::cid::Cid;
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite};

use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH};
use crate::{
unixfs::stream_balanced_tree,
v1::{self},
v2, BlockMetadata, Error, Index, IndexEntry, IndexSorted, SingleWidthIndex,
v2, BlockMetadata, Config, Error, Index, IndexEntry, IndexSorted, SingleWidthIndex,
};

/// CAR file writer.
Expand All @@ -20,6 +19,7 @@ pub struct Blockwriter<W> {
index: HashMap<Cid, BlockMetadata>,
roots: Vec<Cid>,
started: bool,
config: Config,
}

impl<W> Blockwriter<W> {
Expand All @@ -30,6 +30,7 @@ impl<W> Blockwriter<W> {
index: HashMap::new(),
roots: Vec::with_capacity(1),
started: false,
config: Default::default(),
}
}

Expand Down Expand Up @@ -66,6 +67,7 @@ impl Blockwriter<Cursor<Vec<u8>>> {
index: HashMap::new(),
roots: Vec::with_capacity(1),
started: false,
config: Default::default(),
}
}
}
Expand All @@ -74,6 +76,19 @@ impl<W> Blockwriter<W>
where
W: AsyncWrite + AsyncSeek + Unpin,
{
/// Convert `source` into a CAR file, writing it to `writer`.
/// Returns the root [`Cid`].
pub async fn import<S>(source: S, writer: W) -> Result<Cid, Error>
where
S: AsyncRead + Unpin,
{
let mut writer = Self::new(writer);
writer.write_from(source).await?;
let root = *writer.roots.first().ok_or(Error::EmptyRootsError)?;
writer.finish().await?;
Ok(root)
}

/// Writes the contents from `source`, adding a new root to [`Blockwriter`].
pub async fn write_from<S>(&mut self, source: S) -> Result<(), Error>
where
Expand All @@ -87,8 +102,15 @@ where

let mut current_position = self.writer.get_inner_mut().stream_position().await?;

let chunker = crate::chunker::byte_stream_chunker(source, DEFAULT_BLOCK_SIZE);
let nodes = stream_balanced_tree(chunker, DEFAULT_TREE_WIDTH).peekable();
let nodes = match self.config {
Config::Balanced {
chunk_size,
tree_width,
} => {
let chunker = crate::chunker::byte_stream_chunker(source, chunk_size);
stream_balanced_tree(chunker, tree_width).peekable()
}
};
tokio::pin!(nodes);

let mut root = None;
Expand Down
150 changes: 0 additions & 150 deletions mater/lib/src/stores/filestore.rs

This file was deleted.

2 changes: 0 additions & 2 deletions mater/lib/src/stores/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
mod blockstore;
mod file;
mod filestore;

pub use blockstore::Blockwriter;
pub use file::FileBlockstore;
pub use filestore::create_filestore;

/// The default block size, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13).
Expand Down
Binary file removed mater/lib/tests/fixtures/original/zero
Binary file not shown.
2 changes: 1 addition & 1 deletion storage-provider/server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ where
// Stream the body from source to the temp file.
let file = File::create(&temp_file_path).await?;
let writer = BufWriter::new(file);
let cid = mater::create_filestore(source, writer, mater::Config::default()).await?;
let cid = mater::Blockwriter::import(source, writer).await?;
tracing::trace!("finished writing the CAR archive");

// If the file is successfully written, we can now move it to the final
Expand Down

0 comments on commit 0992f42

Please sign in to comment.