Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support unixfs dag in mater #728

Closed

Conversation

pete-eiger
Copy link
Contributor

@pete-eiger pete-eiger commented Feb 4, 2025

Description

This PR extends mater’s content extraction capabilities to support .car files that store UnixFS DAGs. Previously, extraction was limited to sequentially ordered blocks containing arbitrary content, where blocks were assumed to be in the correct order. With this change, mater can now correctly traverse and extract content from files wrapped in UnixFS DAG structures, as used in IPFS.

The implementation introduces a new code branch in the filestore creation and extraction paths. Specifically:

  • New functions have been added to walk through a UnixFS DAG. The extraction now recognizes and processes DAG-PB nodes by reading their links, thereby reconstructing the original file content from its underlying tree structure
  • The reader iterates over the DAG, recursively queuing up child nodes and ensuring all parts of the DAG are traversed and written out correctly
  • The Config enum has been updated with additional constructors - balanced_unixfs(chunk_size, tree_width), for UnixFS-wrapped content (default behavior for IPFS compatibility) and balanced_raw(chunk_size, tree_width) for direct/raw storage without UnixFS metadata. These allow users to specify whether to process content as UnixFS or in raw mode. The CLI has been updated with new flags (raw, chunk_size, and tree_width) to control behavior. Several unit tests have been updated. Inline documentation has been updated to explain the logic and changes.

Checklist

  • Are there important points that reviewers should know?
    • If yes, which ones? - we have to confirm that this is the expected behavior of the unixfs functionality and that no regressions have been introduced to the raw data option
  • Make sure that you described what this change does.
  • If there are follow-ups, have you created issues for them?
    • I haven't fully tackled content extraction as I didn't want this PR to get too bloated
  • Have you tested this solution?
    • this needs better testing, am open to suggestions
  • Were there any alternative implementations considered?
  • Did you document new (or modified) APIs? - inline comments

Open questions (aside from the ones raised in the checklist)

Not sure about whether to remove the overwrite field on Config, thoughts?

@pete-eiger pete-eiger marked this pull request as draft February 4, 2025 10:29
@pete-eiger pete-eiger force-pushed the feat/675/mater-convert-wrap-contents-in-unixfs-dag branch from d6ab6b4 to fe052bb Compare February 4, 2025 10:50
@pete-eiger pete-eiger self-assigned this Feb 4, 2025
@pete-eiger pete-eiger requested a review from a team February 4, 2025 10:51
@pete-eiger pete-eiger added the ready for review Review is needed label Feb 4, 2025
@pete-eiger pete-eiger linked an issue Feb 4, 2025 that may be closed by this pull request
@pete-eiger pete-eiger marked this pull request as ready for review February 4, 2025 10:51
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 4, 2025
@pete-eiger pete-eiger force-pushed the feat/675/mater-convert-wrap-contents-in-unixfs-dag branch from cdbc77a to 3edb9d5 Compare February 4, 2025 11:03
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 4, 2025
Copy link
Collaborator

@jmg-duarte jmg-duarte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't audit the balanced streams, I'll leave that for the final review.

sizes.push(link_info.raw_data_length);
links.push(PbLink {
cid: *child_cid,
name: Some("".to_string()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't it be None instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, but now if we use None some tests will fail

failures:
    stores::blockstore::tests::byte_eq_spaceglenda
    stores::blockstore::tests::dedup_lorem_roundtrip
    stores::filestore::test::test_spaceglenda_roundtrip
    v2::writer::tests::full_spaceglenda

it's cause changing the link’s name from Some("") to None affects the serialized output. In our DAG‑PB the “Name” field is encoded as a field in the protobuf message. When you set it to Some(""), even though the string is empty, the encoder still writes the tag and a length (which will be 0). In contrast, if you set it to None, that field is omitted entirely. This difference causes the final serialized CAR file to have a slightly different size (and different offsets for index entries) compared to the expected reference produced by go‑car or as specified by our tests.

Since our tests compare exact byte lengths and offsets (for example, expecting 1358 bytes versus 1350 bytes, etc.), the change leads to mismatches. In this case, if the expected behavior (or the spec) requires that an empty name is explicitly encoded as an empty string, you need to keep using Some("").

“no name” and “empty name” are not equivalent, in the encoding they differ. The tests check for the exact binary output, so we gotta keep the field present as Some("") if you we to match the expected output.

Copy link
Collaborator

@jmg-duarte jmg-duarte Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember now. Can you leave a comment explaining the edge case? Since we're looking to match go-car's impl, etc

@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 5, 2025
@pete-eiger pete-eiger force-pushed the feat/675/mater-convert-wrap-contents-in-unixfs-dag branch from 9a9746a to 208c554 Compare February 5, 2025 15:24
@pete-eiger pete-eiger force-pushed the feat/675/mater-convert-wrap-contents-in-unixfs-dag branch from 208c554 to 636873a Compare February 5, 2025 15:25
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 5, 2025
raw,
);

let cid = convert_file_to_car(&input_path, &output_path, config, false).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to not overwrite by default now. If we do not want to support overwriting can we remove this boolean?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just not remove it the overwrite flag at all, what isn't broken doesn't need fixing :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are supporting (see #728 (comment))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overwrite flag was useful in our expriments, I'd let it be.

@pete-eiger pete-eiger removed the ready for review Review is needed label Feb 6, 2025
@pete-eiger pete-eiger added the ready for review Review is needed label Feb 6, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 6, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
@pete-eiger pete-eiger added ready for review Review is needed and removed ready for review Review is needed labels Feb 7, 2025
Copy link
Member

@cernicc cernicc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publishing a partial review.

@@ -23,6 +24,8 @@ use tokio_util::{
use tower_http::trace::TraceLayer;
use uuid::Uuid;

type BoxedStream = Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracing::error!(%err, "failed to execute blocking task");
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
})??;

// Branching needed here since the resulting `StreamReader`s don't have the same type
// Determine how to obtain the file's bytes:
let file_cid = if request.headers().contains_key("Content-Type") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is the biggest in the upload function. I am trying to understand what it actually changes, but I can't see the reason why the custom stream is needed. Field already implements the Stream trait.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't noticed this before. Why is a branch changed and not the other? Currently, this code is working why move to a custom made stream?

/// Reads bytes from the source and writes them to a CAR file.
/// Converts a source stream into a CARv2 file and writes it to an output stream.
///
/// Send + 'static bounds are required because the UnixFS processing involves:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bounds descriptions are not needed in the docs


/// Catch-all error for miscellaneous cases.
#[error("other error: {0}")]
Other(String),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Other variant is used only once. Instead of using Other you should use CidError or InvalidCid and remove this variant.

Copy link
Collaborator

@jmg-duarte jmg-duarte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry but I can't allow this PR to go through with all the seemingly unrelated refactors.

Some of them are even clear downgrades, like removing logging from unhandled errors.

tracing::error!(%err, path = %file_path.display(), "failed to remove uploaded piece");
}

let _ = tokio::fs::remove_file(&file_path).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're removing the log and not returning the error.
Silently failing and leaving the user in the dark IS NOT OK

Comment on lines +311 to +315
.await
.map_err(|err| {
tracing::error!(%err, "failed to rename the CAR file");
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
})
.await?;
})?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason the await is better here?

Comment on lines -301 to -302
// We need to rename the file since the original storage name is based on the whole deal proposal CID,
// however, the piece is stored based on its piece_cid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is being removed

Comment on lines -267 to -269
// Calculate the piece commitment in the blocking thread pool since `calculate_piece_commitment`
// is CPU intensive — i.e. blocking — potentially improvement is to move this completely out of
// the tokio runtime into an OS thread
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recurring theme: why are you removing comments that explain the context behind the decisions?

Comment on lines -256 to -258
let _ = tokio::fs::remove_file(&file_path).await.inspect_err(
|err| tracing::error!(%err, path = %file_path.display(), "failed to delete file"),
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the error logs again. Please explain why this would be a good idea

.await
.map_err(|err| {
tracing::error!(%err, "failed to store file into CAR archive");
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
})?
} else {
// Read the request body into a CAR archive
// For direct uploads, convert the request body into a stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a direct upload?

let deal_cid = cid::Cid::from_str(&cid).map_err(|err| {
tracing::error!(cid, "failed to parse cid");
(StatusCode::BAD_REQUEST, err.to_string())
})?;

// Use deal_db (we need it now, so we clone it)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment adds no value. I don't get it, you're removing contextual comments that provide insight to why things are done the way they are, but add comments like this where it's very obvious what's going on.

let proposed_deal =
// Move the fetch to the blocking pool since the RocksDB API is sync
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More context being removed

Comment on lines +59 to +77
let read_bytes = source.read_buf(&mut buf).await?;
trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status");
// EOF but there's still content to yield -> yield it
while buf.len() >= chunk_size {
// The buffer may have a larger capacity than chunk_size due to reserve
// this also means that our read may have read more bytes than we expected,
// thats why we check if the length if bigger than the chunk_size and if so
// we split the buffer to the chunk_size, then freeze and return
let chunk = buf.split_to(chunk_size);
yield chunk.freeze();
} // otherwise, the buffer is not full, so we don't do a thing

if read_bytes == 0 && !buf.is_empty() {
let chunk = buf.split();
yield chunk.freeze();
break;
} else if read_bytes == 0 {
break;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you can explain where the bug was, I expect this changes to be reverted.

I can't allow you to refactor a core piece of logic that is not broken and was the product of multiple discussions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what was going on.
If chunk_size =2, but you read_bytes == 0, and there is still buf.len() == 4 from the previous invocation, you should not yield it in its entirety, but continue chunking it.
It makes sense, HOWEVER.

Then there is a bug/or it should be panic in here:

            if read_bytes == 0 && !buf.is_empty() {
                let chunk = buf.split();
                yield chunk.freeze();
                break;

because if we're chunking, and loop above gave all of the nice sized chunks, why we're yielding something which is not chunk_size? Is this intended?

Imagine, buf.len() == 5, chunk_size == 2, it goes:

yield 2;
yield 2;
yield 1;

so the last yielded chunk won't be a nice chunk.

.....

Then I took a look at: let mut buf = BytesMut::with_capacity(chunk_size);, on the first look it looks like it's always going to read source.read_buf(&mut buf).await?, chunk_size into buf. But is it?

Is BytesMut::with_capacity(chunk_size) guaranteeing that source.read_buf(&mut buf).await? will read always at most chunk_size bytes? Quick search told me that it depends on the behaviour of underlying read_buf, so not sure.

Comment on lines +481 to +482
// Use `file_path` here instead of the undefined `piece_path`
let (piece_commitment, _) = commp(&file_path)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing else changed. Was there a bug in delia?

raw,
);

let cid = convert_file_to_car(&input_path, &output_path, config, false).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overwrite flag was useful in our expriments, I'd let it be.

Comment on lines +74 to +75
if processed.contains(&current_cid) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicate in the DFS. We're checking it already here.

 if !processed.contains(&link.cid) {
 	to_process.push(link.cid);
}

Comment on lines +82 to +84
// Write the raw block data. In a real UnixFS traversal you might need
// to reconstruct file content in order.
output.write_all(&block_bytes).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that not real UnixFS traversal?
I get confused, what extract_content_via_index is supposed to do?
What do we mean by index if actually we are not using index anywhere?

Comment on lines +87 to +97
if current_cid.codec() == crate::multicodec::DAG_PB_CODE {
let mut cursor = std::io::Cursor::new(&block_bytes);
// Propagate any error that occurs during decoding.
let pb_node: ipld_dagpb::PbNode =
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?;
for link in pb_node.links {
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if current_cid.codec() == crate::multicodec::DAG_PB_CODE {
let mut cursor = std::io::Cursor::new(&block_bytes);
// Propagate any error that occurs during decoding.
let pb_node: ipld_dagpb::PbNode =
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?;
for link in pb_node.links {
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
}
}
if current_cid.codec() == crate::multicodec::DAG_PB_CODE {
let mut cursor = std::io::Cursor::new(&block_bytes);
// Propagate any error that occurs during decoding.
let pb_node: ipld_dagpb::PbNode =
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?;
for link in pb_node.links {
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
}
} else {
return Err(Error::UnsupportedCidCodec(current_cid.codec()));
}

Comment on lines +104 to +107
impl<R> CarBlockStore<R>
where
R: AsyncSeek + AsyncReadExt + Unpin,
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we pull it in under one impl block? they seem to be generic over the same trait bounds.

I don't know why one is AsyncSeek, the other AsyncSeekExt though.

Comment on lines +59 to +77
let read_bytes = source.read_buf(&mut buf).await?;
trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status");
// EOF but there's still content to yield -> yield it
while buf.len() >= chunk_size {
// The buffer may have a larger capacity than chunk_size due to reserve
// this also means that our read may have read more bytes than we expected,
// thats why we check if the length if bigger than the chunk_size and if so
// we split the buffer to the chunk_size, then freeze and return
let chunk = buf.split_to(chunk_size);
yield chunk.freeze();
} // otherwise, the buffer is not full, so we don't do a thing

if read_bytes == 0 && !buf.is_empty() {
let chunk = buf.split();
yield chunk.freeze();
break;
} else if read_bytes == 0 {
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what was going on.
If chunk_size =2, but you read_bytes == 0, and there is still buf.len() == 4 from the previous invocation, you should not yield it in its entirety, but continue chunking it.
It makes sense, HOWEVER.

Then there is a bug/or it should be panic in here:

            if read_bytes == 0 && !buf.is_empty() {
                let chunk = buf.split();
                yield chunk.freeze();
                break;

because if we're chunking, and loop above gave all of the nice sized chunks, why we're yielding something which is not chunk_size? Is this intended?

Imagine, buf.len() == 5, chunk_size == 2, it goes:

yield 2;
yield 2;
yield 1;

so the last yielded chunk won't be a nice chunk.

.....

Then I took a look at: let mut buf = BytesMut::with_capacity(chunk_size);, on the first look it looks like it's always going to read source.read_buf(&mut buf).await?, chunk_size into buf. But is it?

Is BytesMut::with_capacity(chunk_size) guaranteeing that source.read_buf(&mut buf).await? will read always at most chunk_size bytes? Quick search told me that it depends on the behaviour of underlying read_buf, so not sure.

Comment on lines +123 to +125
car_v1_start.try_into().unwrap(),
(index_offset - car_v1_start).try_into().unwrap(),
index_offset.try_into().unwrap(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expects?


writer.finish().await?;

Ok(root.unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we sure that root is not None?

Comment on lines +147 to +165
let chunker = async_stream::try_stream! {
let mut buf = BytesMut::with_capacity(chunk_size);
loop {
let read_bytes = source.read_buf(&mut buf).await?;
while buf.len() >= chunk_size {
let chunk = buf.split_to(chunk_size);
yield chunk.freeze();
}

if read_bytes == 0 && !buf.is_empty() {
let chunk = buf.split();
yield chunk.freeze();
break;
} else if read_bytes == 0 {
break;
}
}
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're reusing the chunkers across two methods, it deserves to be a separate method.
This logic is complex enough not to be duplicated by copy pastying.

Ok(root.unwrap())
}

async fn balanced_import_unixfs<Src, Out>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those methods almost look exactly the same, I struggle to find the difference between those two.

  1. Add docs for this one
  2. Can we change it's more clear that most of the logic is actually the same? Like extract some methods etc?

@pete-eiger pete-eiger closed this Feb 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready for review Review is needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Mater convert, wrap contents in unixfs dag
5 participants