Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support unixfs dag in mater #728

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 21 additions & 21 deletions mater/cli/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::error::Error;
pub(crate) async fn convert_file_to_car(
input_path: &PathBuf,
output_path: &PathBuf,
config: Config,
overwrite: bool,
) -> Result<Cid, Error> {
let source_file = File::open(input_path).await?;
Expand All @@ -17,7 +18,7 @@ pub(crate) async fn convert_file_to_car(
} else {
File::create_new(output_path).await
}?;
let cid = create_filestore(source_file, output_file, Config::default()).await?;
let cid = create_filestore(source_file, output_file, config).await?;

Ok(cid)
}
Expand All @@ -29,14 +30,12 @@ mod tests {
use std::str::FromStr;

use anyhow::Result;
use mater::Cid;
use mater::{Cid, Config, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH};
use tempfile::tempdir;
use tokio::{fs::File, io::AsyncWriteExt};

use crate::{convert::convert_file_to_car, error::Error};

#[tokio::test]
async fn convert_file_to_car_success() -> Result<()> {
async fn convert_file_to_car_raw_success() -> Result<()> {
// Setup: Create a dummy input file
let temp_dir = tempdir()?;
let input_path = temp_dir.path().join("test_input.txt");
Expand All @@ -49,8 +48,11 @@ mod tests {
// Define output path
let output_path = temp_dir.path().join("test_output.car");

// Configure in raw mode
let config = Config::balanced_raw(DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH);

// Call the function under test
let result = convert_file_to_car(&input_path, &output_path, false).await;
let result = super::convert_file_to_car(&input_path, &output_path, config, false).await;

// Assert the result is Ok
assert!(result.is_ok());
Expand All @@ -73,15 +75,11 @@ mod tests {
// Define output path
let output_path = temp_dir.path().join("test_output.car");

// Call the function under test
let result = convert_file_to_car(&input_path, &output_path, false).await;

// Assert the result is an error
assert!(result.is_err());
assert!(matches!(result, Err(Error::IoError(..))));
let config = Config::default();

// Close temporary directory
temp_dir.close()?;
// Call the function under test
let result = super::convert_file_to_car(&input_path, &output_path, config, false).await;
assert!(matches!(result, Err(super::Error::IoError(..))));

Ok(())
}
Expand All @@ -94,21 +92,23 @@ mod tests {
let mut input_file = File::create(&input_path).await?;
tokio::io::AsyncWriteExt::write_all(&mut input_file, b"test data").await?;

// Create output file
// Create output file so that the file already exists.
// Since we are not allowing overwrites (overwrite = false), this should trigger an error.
let output_path = temp_dir.path().join("output_file");
File::create_new(&output_path).await?;
println!("gets here");

// Call the function under test
let result = convert_file_to_car(&input_path, &output_path, false).await;
// Provide a configuration (using default in this example).
let config = Config::default();

// Assert the result is an error
// Call the function under test with the config and overwrite flag set to false.
let result = super::convert_file_to_car(&input_path, &output_path, config, false).await;

// Assert the result is an error, specifically an IoError.
assert!(result.is_err());
assert!(matches!(result, Err(Error::IoError(..))));
assert!(matches!(result, Err(super::Error::IoError(..))));

// Close temporary directory
temp_dir.close()?;

Ok(())
}
}
30 changes: 22 additions & 8 deletions mater/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::PathBuf;

use clap::Parser;
use mater::{Config, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH};

use crate::{convert::convert_file_to_car, error::Error, extract::extract_file_from_car};

mod convert;
mod error;
mod extract;
Expand All @@ -26,14 +26,24 @@ enum MaterCli {
#[arg(short, long, action)]
quiet: bool,

/// If enabled, the output will overwrite any existing files.
/// If enabled, content will be stored directly without UnixFS wrapping.
#[arg(long, action)]
overwrite: bool,
raw: bool,

/// Size of each chunk in bytes.
#[arg(long, default_value_t = DEFAULT_CHUNK_SIZE)]
chunk_size: usize,

/// Maximum number of children per parent node.
#[arg(long, default_value_t = DEFAULT_TREE_WIDTH)]
tree_width: usize,
},

/// Convert a CARv2 file to its original format
Extract {
/// Path to CARv2 file
input_path: PathBuf,

/// Path to output file
output_path: Option<PathBuf>,
},
Expand All @@ -46,14 +56,20 @@ async fn main() -> Result<(), Error> {
input_path,
output_path,
quiet,
overwrite,
raw,
chunk_size,
tree_width,
} => {
let output_path = output_path.unwrap_or_else(|| {
let mut new_path = input_path.clone();
new_path.set_extension("car");
new_path
});
let cid = convert_file_to_car(&input_path, &output_path, overwrite).await?;

// Build config with UnixFS wrapping by default
let config = Config::balanced(chunk_size, tree_width, raw);

let cid = convert_file_to_car(&input_path, &output_path, config, false).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to not overwrite by default now. If we do not want to support overwriting can we remove this boolean?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just not remove it the overwrite flag at all, what isn't broken doesn't need fixing :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are supporting (see #728 (comment))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overwrite flag was useful in our expriments, I'd let it be.


if quiet {
println!("{}", cid);
Expand All @@ -75,14 +91,12 @@ async fn main() -> Result<(), Error> {
new_path
});
extract_file_from_car(&input_path, &output_path).await?;

println!(
"Successfully converted CARv2 file {} and saved it to to {}",
"Successfully converted CARv2 file {} and saved it to {}",
input_path.display(),
output_path.display()
);
}
}

Ok(())
}
1 change: 1 addition & 0 deletions mater/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["io"] }
tracing = { workspace = true }

# Optional dependencies
blockstore = { workspace = true, optional = true }
Expand Down
134 changes: 132 additions & 2 deletions mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,138 @@ mod unixfs;
mod v1;
mod v2;

// We need to re-expose this because `read_block` returns `(Cid, Vec<u8>)`.
use std::{
collections::{HashMap, HashSet},
io::SeekFrom,
};

pub use ipld_core::cid::Cid;
use ipld_core::codec::Codec;
use ipld_dagpb::DagPbCodec;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockstore, Config, FileBlockstore};
pub use stores::{
create_filestore, Blockstore, Config, FileBlockstore, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH,
};
use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted,
MultihashIndexSorted, Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer,
};

/// Represents the location and size of a block in the CAR file.
pub struct BlockLocation {
/// The byte offset in the CAR file where the block starts.
pub offset: u64,
/// The size (in bytes) of the block.
pub size: u64,
}

/// A simple blockstore backed by a CAR file and its index.
pub struct CarBlockStore<R> {
reader: R,
/// Mapping from CID to block location.
pub index: HashMap<Cid, BlockLocation>,
}

impl<R> CarBlockStore<R>
where
R: AsyncSeekExt + AsyncReadExt + Unpin,
{
/// Extract content by traversing the UnixFS DAG using the index.
pub async fn extract_content_via_index<W>(
&mut self,
root: &Cid,
output: &mut W,
) -> Result<(), Error>
where
W: AsyncWriteExt + Unpin,
{
// To avoid processing a block more than once.
let mut processed = HashSet::new();
// We use a stack for DFS traversal.
let mut to_process = vec![*root];

while let Some(current_cid) = to_process.pop() {
if processed.contains(&current_cid) {
continue;
Comment on lines +74 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicate in the DFS. We're checking it already here.

 if !processed.contains(&link.cid) {
 	to_process.push(link.cid);
}

}
processed.insert(current_cid);

// Retrieve block by CID via the index.
let block_bytes = self.get_block(&current_cid).await?;

// Write the raw block data. In a real UnixFS traversal you might need
// to reconstruct file content in order.
output.write_all(&block_bytes).await?;
Comment on lines +82 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that not real UnixFS traversal?
I get confused, what extract_content_via_index is supposed to do?
What do we mean by index if actually we are not using index anywhere?


// If the block is a DAG-PB node, decode and enqueue its children.
if current_cid.codec() == crate::multicodec::DAG_PB_CODE {
let mut cursor = std::io::Cursor::new(&block_bytes);
// Propagate any error that occurs during decoding.
let pb_node: ipld_dagpb::PbNode =
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?;
for link in pb_node.links {
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
}
}
Comment on lines +87 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if current_cid.codec() == crate::multicodec::DAG_PB_CODE {
let mut cursor = std::io::Cursor::new(&block_bytes);
// Propagate any error that occurs during decoding.
let pb_node: ipld_dagpb::PbNode =
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?;
for link in pb_node.links {
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
}
}
if current_cid.codec() == crate::multicodec::DAG_PB_CODE {
let mut cursor = std::io::Cursor::new(&block_bytes);
// Propagate any error that occurs during decoding.
let pb_node: ipld_dagpb::PbNode =
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?;
for link in pb_node.links {
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
}
} else {
return Err(Error::UnsupportedCidCodec(current_cid.codec()));
}

}

Ok(())
}
}

impl<R> CarBlockStore<R>
where
R: AsyncSeek + AsyncReadExt + Unpin,
{
Comment on lines +104 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we pull it in under one impl block? they seem to be generic over the same trait bounds.

I don't know why one is AsyncSeek, the other AsyncSeekExt though.

/// Given a reader positioned at the start of a CAR file,
/// load the CARv2 index and build a mapping of CID -> (offset, size).
/// For simplicity, assume the CAR header has been read and the index offset is known.
pub async fn load_index(
mut reader: R,
index_offset: u64,
) -> Result<HashMap<Cid, BlockLocation>, Error> {
// Seek to the start of the index.
reader.seek(SeekFrom::Start(index_offset)).await?;
// Parse the index according to the CARv2 spec. For demonstration,
// we assume a very simple format where each index entry is:
// [CID length (u8)][CID bytes][offset (u64)][size (u64)]
let mut index = HashMap::new();
// In a real implementation you’d read until EOF or index length.
// Here we use a simple loop:
loop {
let cid_len = match reader.read_u8().await {
Ok(n) => n as usize,
Err(_) => break,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break without eny errors? shouldn't we exit early?

};
let mut cid_buf = vec![0u8; cid_len];
reader.read_exact(&mut cid_buf).await?;
let cid = Cid::try_from(cid_buf).map_err(|e| Error::Other(e.to_string()))?;

let offset = reader.read_u64_le().await?;
let size = reader.read_u64_le().await?;
index.insert(cid, BlockLocation { offset, size });
}
Ok(index)
}

/// Retrieve a block by its CID. This method uses the in-memory index
/// to seek directly to the block’s location.
pub async fn get_block(&mut self, cid: &Cid) -> Result<Vec<u8>, Error> {
if let Some(location) = self.index.get(cid) {
self.reader.seek(SeekFrom::Start(location.offset)).await?;
let mut buf = vec![0u8; location.size as usize];
self.reader.read_exact(&mut buf).await?;
Ok(buf)
} else {
Err(Error::BlockNotFound(cid.to_string()))
}
}
}

/// CAR handling errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -111,6 +233,14 @@ pub enum Error {
/// See [`DagPbError`](ipld_dagpb::Error) for more information.
#[error(transparent)]
DagPbError(#[from] ipld_dagpb::Error),

/// Catch-all error for miscellaneous cases.
#[error("other error: {0}")]
Other(String),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Other variant is used only once. Instead of using Other you should use CidError or InvalidCid and remove this variant.


/// Error indicating that the requested block could not be found found in the CAR file's index.
#[error("block not found: {0}")]
BlockNotFound(String),
}

#[cfg(test)]
Expand Down
Loading