From 51c5e1d56a3e7915c8b26daa633411434ef2898e Mon Sep 17 00:00:00 2001 From: Kit Plummer Date: Sun, 21 Jun 2026 08:37:51 -0700 Subject: [PATCH 1/2] docs(attachments): correct inbox layout to mirrored outbox path (#173) + version minimums The example README described delivered files landing at inbox/{distribution_id}/{filename}. As of v0.4.8 (#173) the inbox mirrors the sender's outbox layout instead (outbox/sub/f -> inbox/sub/f, original name, latest-wins), with an unsafe-name fallback to {distribution_id}.bin at the inbox root. Validated end-to-end across two arm64 hosts, both directions. - README: rewrite the three stale {distribution_id} path claims; add a per-feature version-minimums table (PRD-006 v0.2.0, reliable delivery v0.3.0, derive-id v0.4.4, outbox-watch v0.4.5, mirrored layout v0.4.8). - Bump all three example compose pins v0.4.5 -> v0.4.8 so they exhibit the documented layout and match the validated run. Does NOT touch the peat-cli InboxSink / e2e TestInboxSink / main.rs doc comment, which still describe+implement the old layout (tracked separately). --- examples/compose/attachments/README.md | 52 ++++++++++++------- .../attachments/docker-compose.multi-host.yml | 2 +- .../attachments/docker-compose.two-node.yml | 4 +- .../compose/attachments/docker-compose.yml | 2 +- 4 files changed, 36 insertions(+), 24 deletions(-) diff --git a/examples/compose/attachments/README.md b/examples/compose/attachments/README.md index 42fb28e..a5482a2 100644 --- a/examples/compose/attachments/README.md +++ b/examples/compose/attachments/README.md @@ -13,18 +13,20 @@ The single-node setup below is the per-size benchmark for sender-side ingest. **For the real delivery demo, jump to "Two-node delivery" below.** -> **Uses published `v0.3.1` image by default.** The PRD-006 attachment -> surface first shipped in `v0.2.0`, but `v0.2.x` carries the peat#864 -> substrate bug: the sender's `SubscribeAttachmentBundle` stream stalls -> one frame short of terminal on a real cross-peer transfer. `v0.3.0` -> closes that end-to-end (and `v0.3.1` relocates the receive lifecycle -> into peat-protocol with no behavior change), so the two-node delivery -> demo needs `v0.3.0+`. -> Earlier `v0.1.x` images predate PRD-006 entirely and fail with -> `unimplemented: method not found`. To test local changes ahead of a -> release, comment out the `image:` line and uncomment the `build:` -> block in `docker-compose.yml` (or `docker-compose.two-node.yml` for -> the cross-peer demo) to build from the repo root. +> **All compose files here pin `v0.4.8`** (the latest release), which satisfies +> every attachment feature in the table below. To test local changes ahead of a +> release, comment out the `image:` line and uncomment the `build:` block in any +> of the compose files to build from the repo root. + +### Attachment feature version minimums + +| Capability | Min version | Notes | +|---|---|---| +| PRD-006 attachment RPCs (`SendAttachments`, status lookup) | `v0.2.0` | `v0.1.x` predates PRD-006 and fails with `unimplemented: method not found`. | +| Reliable cross-peer delivery | `v0.3.0` | `v0.2.x` carries the peat#864 substrate bug — the sender's `SubscribeAttachmentBundle` stream stalls one frame short of terminal on a real transfer. `v0.3.1` relocated the receive lifecycle into peat-protocol (no behavior change). | +| Deterministic identity + `derive-id` (multi-host peering) | `v0.4.4` | Offline peer-id derivation; see [Multi-host delivery](#multi-host-delivery-separate-machines-no-mdns) below. | +| Hands-off outbox watcher (`PEAT_NODE_ATTACHMENT_OUTBOX_WATCH`) | `v0.4.5` | Auto-distributes any stable new file dropped in an outbox root — no `SendAttachments` call. | +| Inbox mirrors the sender's outbox layout | `v0.4.8` | [#173](https://github.com/defenseunicorns/peat-node/issues/173); earlier images nested every delivery under `inbox/{distribution_id}/{filename}`. | The two-node CRDT sync demo lives one directory up at [`../docker-compose.yml`](../docker-compose.yml); this one is the @@ -93,11 +95,11 @@ landing). docker compose -f docker-compose.two-node.yml up -d ./peer.sh # bidirectional ConnectPeer ENDPOINT=http://127.0.0.1:50061 OUTBOX_DIR=outbox-a ./send.sh -ls inbox-b/ # files appear here per distribution_id +ls inbox-b/ # delivered files mirror the sender's outbox layout docker compose -f docker-compose.two-node.yml down -v ``` -(Pulls `ghcr.io/defenseunicorns/peat-node:v0.4.5`. For testing local +(Pulls `ghcr.io/defenseunicorns/peat-node:v0.4.8`. For testing local changes, swap the `image:` line for the commented `build:` block in both services.) @@ -118,10 +120,19 @@ only by `ConnectPeer` *into* A. Without A → B as well, A's distribution doc carries an empty `target_nodes` and B correctly concludes "not for me." -When `send.sh` is pointed at A, B's inbox accumulates -`inbox-b/{distribution_id}/{filename}` for each successful delivery. -Apps watching the inbox can correlate a `distribution_id` back to the -sender via `GetAttachmentDistribution`. +When `send.sh` is pointed at A, each delivered file lands in B's inbox +**mirroring the sender's outbox layout**: `outbox-a/hello.txt` arrives at +`inbox-b/hello.txt` (and a file under `outbox-a/sub/` at `inbox-b/sub/`), +byte-identical, latest-wins on re-delivery. Apps watching the inbox can still +correlate a delivery back to the sender via `GetAttachmentDistribution` — the +`distribution_id` travels in the synced `file_distributions` doc and the +receive-side log line, not in the on-disk path. + +> **Inbox layout changed in v0.4.8 ([#173](https://github.com/defenseunicorns/peat-node/issues/173)).** +> Earlier images nested every delivery under `inbox-b/{distribution_id}/{filename}`; +> v0.4.8+ mirrors the sender's outbox path instead. A sender-supplied name that +> can't be safely resolved (absolute, or containing `..`) falls back to a flat +> `{distribution_id}.bin` at the inbox root. ## Multi-host delivery (separate machines, no mDNS) @@ -183,8 +194,9 @@ matching opt-in lines are stubbed in `docker-compose.multi-host.yml`: writable host dir at `/var/lib/peat/inbox`. Send with `SendAttachments` (scope `allNodes`); the receiver's inbox watcher -fetches the blob over iroh and writes it to -`{inbox}/{distribution_id}/{filename}`, byte-identical to the source. +fetches the blob over iroh and writes it to `{inbox}/{relative_path}` — +mirroring the sender's outbox layout (v0.4.8+; see the [#173](https://github.com/defenseunicorns/peat-node/issues/173) +note under "Two-node delivery" above) — byte-identical to the source. **Hands-off (synced-folder) mode.** Set `PEAT_NODE_ATTACHMENT_OUTBOX_WATCH=true` on the sender (requires v0.4.5+) and you don't call `SendAttachments` at all: diff --git a/examples/compose/attachments/docker-compose.multi-host.yml b/examples/compose/attachments/docker-compose.multi-host.yml index 3859d4f..69fd8f4 100644 --- a/examples/compose/attachments/docker-compose.multi-host.yml +++ b/examples/compose/attachments/docker-compose.multi-host.yml @@ -83,7 +83,7 @@ services: # Deterministic identity + `derive-id` require peat-node v0.4.4+. To run # local changes ahead of a release, comment out `image:` and uncomment # `build:`. - image: ghcr.io/defenseunicorns/peat-node:v0.4.5 + image: ghcr.io/defenseunicorns/peat-node:v0.4.8 # build: # context: ../../.. container_name: peat-node-${NODE_ID} diff --git a/examples/compose/attachments/docker-compose.two-node.yml b/examples/compose/attachments/docker-compose.two-node.yml index 9e3403d..045a7c3 100644 --- a/examples/compose/attachments/docker-compose.two-node.yml +++ b/examples/compose/attachments/docker-compose.two-node.yml @@ -34,7 +34,7 @@ services: # SubscribeAttachmentBundle one frame short of terminal. For # testing local changes, comment out `image:` and uncomment the # `build:` block. - image: ghcr.io/defenseunicorns/peat-node:v0.4.5 + image: ghcr.io/defenseunicorns/peat-node:v0.4.8 # build: # context: ../../.. container_name: peat-node-attachments-a @@ -57,7 +57,7 @@ services: - ./outbox-a:/var/lib/peat/outbox:ro peat-node-b: - image: ghcr.io/defenseunicorns/peat-node:v0.4.5 + image: ghcr.io/defenseunicorns/peat-node:v0.4.8 # build: # context: ../../.. container_name: peat-node-attachments-b diff --git a/examples/compose/attachments/docker-compose.yml b/examples/compose/attachments/docker-compose.yml index 2bcb70d..dab8189 100644 --- a/examples/compose/attachments/docker-compose.yml +++ b/examples/compose/attachments/docker-compose.yml @@ -29,7 +29,7 @@ services: # use v0.3.0+ for the delivery demo. For testing changes ahead of # a release, comment out the `image:` line and uncomment the # `build:` block to build from the repo root. - image: ghcr.io/defenseunicorns/peat-node:v0.4.5 + image: ghcr.io/defenseunicorns/peat-node:v0.4.8 # build: # context: ../../.. container_name: peat-node-attachments From ebf898b219820d06def4a8c1b130d112bf758fbe Mon Sep 17 00:00:00 2001 From: Kit Plummer Date: Sun, 21 Jun 2026 09:01:59 -0700 Subject: [PATCH 2/2] fix(attachments): align peat-cli + test inbox sinks to mirrored outbox layout (#173) #173 changed the production FilesystemInboxSink to write delivered blobs at {inbox}/{relative_path} (mirroring the sender's outbox layout), but the peat-cli InboxSink and the e2e TestInboxSink still nested every delivery under {inbox}/{distribution_id}/{filename}. A peat-cli receiver and a peat-node receiver therefore disagreed on where files land. Bring both sinks into lockstep with production: - peat-cli InboxSink: resolve a sanitised inbox-relative path (subdirs preserved, absolute/.. rejected -> {distribution_id}.bin fallback), create nested parents, add the post-write size-validation guard. - e2e TestInboxSink: same path resolution + already_delivered keyed on the mirrored path. - Update the two e2e scenarios to assert the mirrored path and that no {distribution_id}/ dir is created. - Fix stale doc comments: attach.rs module + watch help, main.rs --attachment-inbox. Regression tests (peat-cli lib): deliver_mirrors_relative_path_not_distribution_id_dir, deliver_traversal_name_falls_back_inside_inbox, deliver_size_mismatch_bails_without_publishing, plus inbox_relpath_* unit coverage. Adds chrono as a dev-dependency for the DistributionDocument fixture. Verified: fmt, clippy -D warnings, cargo test --workspace (the two real-iroh attach e2e scenarios now assert the mirrored layout end-to-end). --- Cargo.lock | 1 + crates/peat-cli/Cargo.toml | 3 + crates/peat-cli/src/cli/attach.rs | 243 +++++++++++++++++++------ crates/peat-cli/tests/e2e/scenarios.rs | 51 +++--- crates/peat-cli/tests/e2e/topology.rs | 88 ++++----- src/main.rs | 4 +- 6 files changed, 264 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 731b383..a296f6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3893,6 +3893,7 @@ dependencies = [ "async-trait", "automerge", "base64", + "chrono", "clap", "dirs", "iroh", diff --git a/crates/peat-cli/Cargo.toml b/crates/peat-cli/Cargo.toml index f8e4e9b..fe91d26 100644 --- a/crates/peat-cli/Cargo.toml +++ b/crates/peat-cli/Cargo.toml @@ -66,3 +66,6 @@ serial_test = "3" # Unique per-run formation id in the mDNS e2e test so concurrent runs on # the shared self-hosted runner can't discover each other over loopback. uuid = { version = "1", features = ["v4"] } +# Builds a DistributionDocument fixture (`started_at: DateTime`) for the +# InboxSink layout regression test. Matches the version peat-mesh resolves. +chrono = "0.4" diff --git a/crates/peat-cli/src/cli/attach.rs b/crates/peat-cli/src/cli/attach.rs index 01bfd47..509ab9d 100644 --- a/crates/peat-cli/src/cli/attach.rs +++ b/crates/peat-cli/src/cli/attach.rs @@ -8,7 +8,7 @@ //! distribution from the synced Automerge store. //! * `peat attach watch [--inbox ]` — start a receive watcher that polls //! for incoming distribution documents targeting this node and writes each -//! blob to `//`. Runs until `SIGINT` +//! blob to `/` (mirroring the outbox layout). Runs until `SIGINT` //! unless `--dist-id` is given, in which case it exits once that specific //! distribution has been delivered. @@ -314,9 +314,12 @@ fn parse_priority(s: &str) -> Result { } } -/// Inbox receive sink: writes each delivered blob to -/// `{inbox_root}/{distribution_id}/{filename}` via a tmp-then-rename pair. -/// Mirrors the `FilesystemInboxSink` in `peat-node::attachments::inbox`. +/// Inbox receive sink: writes each delivered blob to `{inbox_root}/{relative_path}`, +/// mirroring the sender's outbox layout (so `outbox/sub/report.pdf` lands at +/// `inbox/sub/report.pdf`) via a tmp-then-rename pair. Re-delivery of the same +/// path overwrites (latest-wins). Faithfully mirrors the `FilesystemInboxSink` in +/// `peat-node::attachments::inbox` — including the path-traversal guard and the +/// post-write size validation; keep the two in lockstep. /// /// When constructed with [`InboxSink::new_with_notify`], fires the provided /// `Notify` after the target distribution's blob is successfully renamed into @@ -346,40 +349,54 @@ impl InboxSink { #[async_trait] impl ReceiveSink for InboxSink { + /// "Already delivered" gate keyed on the mirrored path + /// (`{inbox}/{relative_path}`): a restarted watcher doesn't re-fetch a + /// delivery whose target already holds a regular file of the declared size. async fn already_delivered(&self, doc: &DistributionDocument) -> bool { - let dir = self.inbox_root.join(&doc.distribution_id); - if !dir.is_dir() { - return false; + let rel = inbox_relpath(&doc.blob_metadata) + .unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id))); + match tokio::fs::metadata(self.inbox_root.join(rel)).await { + Ok(md) => md.is_file() && md.len() == doc.blob_size, + Err(_) => false, } - let mut iter = match tokio::fs::read_dir(&dir).await { - Ok(i) => i, - Err(_) => return false, - }; - while let Ok(Some(entry)) = iter.next_entry().await { - if entry - .path() - .file_name() - .and_then(|n| n.to_str()) - .is_some_and(|s| s.starts_with('.')) - { - continue; - } - if let Ok(md) = entry.metadata().await { - if md.is_file() && md.len() == doc.blob_size { - return true; - } - } - } - false } async fn deliver(&self, doc: &DistributionDocument, blob_path: &Path) -> anyhow::Result<()> { - let dir = self.inbox_root.join(&doc.distribution_id); - tokio::fs::create_dir_all(&dir).await?; - let filename = inbox_filename(&doc.blob_metadata, &doc.distribution_id); - let target = dir.join(&filename); - let tmp = dir.join(format!(".{filename}.partial")); + let rel = inbox_relpath(&doc.blob_metadata) + .unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id))); + let target = self.inbox_root.join(&rel); + + // Create the (possibly nested) parent dir and stage the tmp file in it + // so the publishing rename is atomic on the same filesystem. + let parent = target + .parent() + .map(Path::to_path_buf) + .unwrap_or_else(|| self.inbox_root.clone()); + tokio::fs::create_dir_all(&parent).await?; + let fname = target + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or("blob"); + let tmp = parent.join(format!(".{fname}.partial")); + tokio::fs::copy(blob_path, &tmp).await?; + + // Post-write size validation before the publishing rename — matches + // FilesystemInboxSink. iroh already content-verifies the blob on fetch, + // so a size match confirms a complete, untruncated local write. On + // mismatch, drop the tmp and bail so the watcher retries rather than + // publishing a short file. + let written = tokio::fs::metadata(&tmp).await?.len(); + if written != doc.blob_size { + let _ = tokio::fs::remove_file(&tmp).await; + anyhow::bail!( + "inbox write size mismatch for {} (dist {}): wrote {written} bytes, \ + expected {} — leaving for retry", + rel.display(), + doc.distribution_id, + doc.blob_size + ); + } tokio::fs::rename(&tmp, &target).await?; if let Some((target_id, notify)) = &self.delivery_signal { if doc.distribution_id == *target_id { @@ -390,22 +407,39 @@ impl ReceiveSink for InboxSink { } } -/// Derive a safe inbox filename from blob metadata. Strips path separators -/// and leading dots so a sender cannot redirect writes outside the inbox -/// subdirectory. Falls back to `.bin` when metadata has -/// no usable name. -fn inbox_filename(metadata: &BlobMetadata, distribution_id: &str) -> String { - if let Some(raw) = metadata.name.as_ref() { - let last = Path::new(raw) - .file_name() - .and_then(|s| s.to_str()) - .map(|s| s.trim_start_matches('.')) - .filter(|s| !s.is_empty()); - if let Some(name) = last { - return name.to_string(); +/// Resolve the sender-provided `blob_metadata.name` into a safe path **relative +/// to the inbox root**, preserving subdirectories so the inbox mirrors the +/// sender's layout (`outbox/sub/demo.txt` → `inbox/sub/demo.txt`). Faithfully +/// mirrors `peat-node::attachments::inbox::inbox_relpath`. +/// +/// Path-traversal guard: the sender controls `name`, so only `Normal` path +/// components are accepted. Any absolute path, `..`, root, or drive-prefix +/// component returns `None`, and the caller falls back to a flat +/// `.bin` at the inbox root — a malicious or malformed name +/// can never write outside the inbox. Returns `None` for a missing/empty name +/// or one that sanitises to nothing. +fn inbox_relpath(metadata: &BlobMetadata) -> Option { + use std::path::Component; + + let raw = metadata.name.as_deref()?; + if raw.is_empty() { + return None; + } + let mut safe = PathBuf::new(); + for comp in Path::new(raw).components() { + match comp { + Component::Normal(c) => safe.push(c), + Component::CurDir => {} // "." — ignore + // ".." (ParentDir), "/" (RootDir), or a Windows prefix — reject the + // whole name rather than try to repair it. + _ => return None, } } - format!("{distribution_id}.bin") + if safe.as_os_str().is_empty() { + None + } else { + Some(safe) + } } #[cfg(test)] @@ -472,25 +506,124 @@ mod tests { assert_eq!(parse_priority("unknown").unwrap_err().exit_code(), 4); } + fn doc_with(distribution_id: &str, blob_size: u64, name: Option<&str>) -> DistributionDocument { + DistributionDocument { + distribution_id: distribution_id.to_string(), + blob_hash: "deadbeef".to_string(), + blob_size, + blob_metadata: BlobMetadata { + name: name.map(String::from), + content_type: None, + custom: std::collections::HashMap::new(), + }, + scope: DistributionScope::AllNodes, + priority: TransferPriority::Normal, + target_nodes: vec![], + started_at: chrono::Utc::now(), + status: "distributing".to_string(), + cancelled_at: None, + collection: None, + node_statuses: std::collections::HashMap::new(), + } + } + #[test] - fn inbox_filename_uses_name() { - let meta = BlobMetadata::with_name("report.pdf".to_string()); - assert_eq!(inbox_filename(&meta, "dist-X"), "report.pdf"); + fn inbox_relpath_preserves_subdirs() { + assert_eq!( + inbox_relpath(&BlobMetadata::with_name("report.pdf".to_string())), + Some(PathBuf::from("report.pdf")) + ); + assert_eq!( + inbox_relpath(&BlobMetadata::with_name("sub/dir/report.pdf".to_string())), + Some(PathBuf::from("sub/dir/report.pdf")) + ); } #[test] - fn inbox_filename_strips_path() { - let meta = BlobMetadata::with_name("/etc/passwd".to_string()); - assert_eq!(inbox_filename(&meta, "dist-X"), "passwd"); + fn inbox_relpath_rejects_traversal_and_absolute() { + // Sender-controlled name must never escape the inbox — reject so the + // caller uses the `.bin` fallback. + assert_eq!( + inbox_relpath(&BlobMetadata::with_name("../../etc/passwd".to_string())), + None + ); + assert_eq!( + inbox_relpath(&BlobMetadata::with_name("/etc/passwd".to_string())), + None + ); + assert_eq!( + inbox_relpath(&BlobMetadata::with_name("a/../../b".to_string())), + None + ); } #[test] - fn inbox_filename_fallback() { - let meta = BlobMetadata { + fn inbox_relpath_none_for_missing_or_empty() { + let none = BlobMetadata { name: None, content_type: None, custom: std::collections::HashMap::new(), }; - assert_eq!(inbox_filename(&meta, "dist-X"), "dist-X.bin"); + assert_eq!(inbox_relpath(&none), None); + assert_eq!(inbox_relpath(&BlobMetadata::with_name(String::new())), None); + } + + /// Regression guard for #173 / the peat-cli↔peat-node inbox-layout + /// divergence: `deliver()` must mirror the sender's relative path, NOT nest + /// the file under a `{distribution_id}/` directory like the pre-#173 sink. + #[tokio::test] + async fn deliver_mirrors_relative_path_not_distribution_id_dir() { + let inbox = tempfile::tempdir().unwrap(); + let src = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(src.path(), b"hello world").unwrap(); + let sink = InboxSink::new(inbox.path().to_path_buf()); + let doc = doc_with("dist-Y", 11, Some("sub/dir/greeting.txt")); + + sink.deliver(&doc, src.path()).await.unwrap(); + + let landed = inbox.path().join("sub").join("dir").join("greeting.txt"); + assert_eq!(std::fs::read(&landed).unwrap(), b"hello world"); + // The old layout nested under {distribution_id}/ — assert it does NOT. + assert!( + !inbox.path().join("dist-Y").exists(), + "delivery must not nest under a distribution_id directory" + ); + assert!(sink.already_delivered(&doc).await); + + // Re-deliver overwrites (latest-wins); no partial left behind. + sink.deliver(&doc, src.path()).await.unwrap(); + assert_eq!(std::fs::read(&landed).unwrap(), b"hello world"); + assert!(!inbox + .path() + .join("sub") + .join("dir") + .join(".greeting.txt.partial") + .exists()); + } + + #[tokio::test] + async fn deliver_traversal_name_falls_back_inside_inbox() { + let inbox = tempfile::tempdir().unwrap(); + let src = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(src.path(), b"x").unwrap(); + let sink = InboxSink::new(inbox.path().to_path_buf()); + // A hostile name resolves to the flat fallback inside the inbox. + let doc = doc_with("dist-evil", 1, Some("../../../../tmp/pwned")); + sink.deliver(&doc, src.path()).await.unwrap(); + assert!(inbox.path().join("dist-evil.bin").is_file()); + } + + #[tokio::test] + async fn deliver_size_mismatch_bails_without_publishing() { + let inbox = tempfile::tempdir().unwrap(); + let src = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(src.path(), b"hello world").unwrap(); // 11 bytes + let sink = InboxSink::new(inbox.path().to_path_buf()); + let doc = doc_with("dist-Z", 99, Some("x.txt")); // declares 99 + assert!(sink.deliver(&doc, src.path()).await.is_err()); + assert!( + !inbox.path().join("x.txt").exists(), + "a short file must not be published" + ); } } diff --git a/crates/peat-cli/tests/e2e/scenarios.rs b/crates/peat-cli/tests/e2e/scenarios.rs index 95106fb..6c560cd 100644 --- a/crates/peat-cli/tests/e2e/scenarios.rs +++ b/crates/peat-cli/tests/e2e/scenarios.rs @@ -1190,24 +1190,22 @@ async fn attach_send_delivers_file_to_peer_inbox() { ); assert_eq!(v["completed"], 1, "expected 1 completed target: {stdout}"); - // The receive watcher delivers to {inbox}/{dist_id}/{filename}. - let dist_dir = attach_peer.inbox_dir.path().join(&dist_id); + // The receive watcher mirrors the sender's outbox layout: a file sent as + // `payload.txt` lands at {inbox}/payload.txt — NOT {inbox}/{dist_id}/... + let landed = attach_peer.inbox_dir.path().join("payload.txt"); let deadline = std::time::Instant::now() + Duration::from_secs(10); loop { - if dist_dir.is_dir() { - let entries: Vec<_> = std::fs::read_dir(&dist_dir) - .unwrap() - .filter_map(|e| e.ok()) - .filter(|e| !e.file_name().to_str().unwrap_or("").starts_with('.')) - .collect(); - if let Some(entry) = entries.first() { - let content = std::fs::read(entry.path()).unwrap(); - assert_eq!( - content, b"attach e2e test payload", - "delivered file content mismatch" - ); - break; - } + if landed.is_file() { + let content = std::fs::read(&landed).unwrap(); + assert_eq!( + content, b"attach e2e test payload", + "delivered file content mismatch" + ); + assert!( + !attach_peer.inbox_dir.path().join(&dist_id).exists(), + "delivery must not nest under a distribution_id directory; dist_id={dist_id}" + ); + break; } assert!( std::time::Instant::now() < deadline, @@ -1286,19 +1284,18 @@ async fn attach_watch_receives_file_from_peer() { let (_token, handle) = attach_peer.distribute_file(&src, "from_peer.bin").await; let dist_id = handle.distribution_id.clone(); - // Poll CLI's inbox until the file appears. - let inbox_dist_dir = inbox_dir.path().join(&dist_id); + // Poll CLI's inbox until the file appears. The CLI sink mirrors the + // sender's layout: `from_peer.bin` lands at {inbox}/from_peer.bin — NOT + // nested under {inbox}/{dist_id}/. + let landed = inbox_dir.path().join("from_peer.bin"); let deadline = std::time::Instant::now() + Duration::from_secs(15); let content = loop { - if inbox_dist_dir.is_dir() { - let entries: Vec<_> = std::fs::read_dir(&inbox_dist_dir) - .unwrap() - .filter_map(|e| e.ok()) - .filter(|e| !e.file_name().to_str().unwrap_or("").starts_with('.')) - .collect(); - if let Some(entry) = entries.first() { - break std::fs::read(entry.path()).unwrap(); - } + if landed.is_file() { + assert!( + !inbox_dir.path().join(&dist_id).exists(), + "delivery must not nest under a distribution_id directory; dist_id={dist_id}" + ); + break std::fs::read(&landed).unwrap(); } assert!( std::time::Instant::now() < deadline, diff --git a/crates/peat-cli/tests/e2e/topology.rs b/crates/peat-cli/tests/e2e/topology.rs index b4b21d0..eabff3d 100644 --- a/crates/peat-cli/tests/e2e/topology.rs +++ b/crates/peat-cli/tests/e2e/topology.rs @@ -268,9 +268,9 @@ impl TestPeer { /// Filesystem inbox sink for test use. Mirrors the production /// `FilesystemInboxSink` in `peat-node::attachments::inbox`: writes each -/// delivered blob to `{inbox_root}/{distribution_id}/{filename}` via a -/// tmp-then-rename pair. `already_delivered` checks for any regular file of -/// the declared size — same logic as production so restart-idempotency works. +/// delivered blob to `{inbox_root}/{relative_path}` (mirroring the sender's +/// outbox layout) via a tmp-then-rename pair. `already_delivered` keys on that +/// same mirrored path — same logic as production so restart-idempotency works. pub struct TestInboxSink { inbox_root: PathBuf, } @@ -284,56 +284,58 @@ impl TestInboxSink { #[async_trait] impl ReceiveSink for TestInboxSink { async fn already_delivered(&self, doc: &DistributionDocument) -> bool { - let dir = self.inbox_root.join(&doc.distribution_id); - if !dir.is_dir() { - return false; + let rel = test_inbox_relpath(&doc.blob_metadata) + .unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id))); + match tokio::fs::metadata(self.inbox_root.join(rel)).await { + Ok(md) => md.is_file() && md.len() == doc.blob_size, + Err(_) => false, } - let mut iter = match tokio::fs::read_dir(&dir).await { - Ok(i) => i, - Err(_) => return false, - }; - while let Ok(Some(entry)) = iter.next_entry().await { - if entry - .path() - .file_name() - .and_then(|n| n.to_str()) - .is_some_and(|s| s.starts_with('.')) - { - continue; - } - if let Ok(md) = entry.metadata().await { - if md.is_file() && md.len() == doc.blob_size { - return true; - } - } - } - false } async fn deliver(&self, doc: &DistributionDocument, blob_path: &Path) -> anyhow::Result<()> { - let dir = self.inbox_root.join(&doc.distribution_id); - tokio::fs::create_dir_all(&dir).await?; - let filename = test_inbox_filename(&doc.blob_metadata, &doc.distribution_id); - let target = dir.join(&filename); - let tmp = dir.join(format!(".{filename}.partial")); + let rel = test_inbox_relpath(&doc.blob_metadata) + .unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id))); + let target = self.inbox_root.join(&rel); + let parent = target + .parent() + .map(Path::to_path_buf) + .unwrap_or_else(|| self.inbox_root.clone()); + tokio::fs::create_dir_all(&parent).await?; + let fname = target + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or("blob"); + let tmp = parent.join(format!(".{fname}.partial")); tokio::fs::copy(blob_path, &tmp).await?; tokio::fs::rename(&tmp, &target).await?; Ok(()) } } -fn test_inbox_filename(metadata: &BlobMetadata, distribution_id: &str) -> String { - if let Some(raw) = metadata.name.as_ref() { - let last = Path::new(raw) - .file_name() - .and_then(|s| s.to_str()) - .map(|s| s.trim_start_matches('.')) - .filter(|s| !s.is_empty()); - if let Some(name) = last { - return name.to_string(); +/// Resolve `blob_metadata.name` to a safe inbox-relative path, mirroring +/// `peat-node::attachments::inbox::inbox_relpath` (and peat-cli's `InboxSink`): +/// preserve subdirectories, reject any absolute/`..` component (caller falls +/// back to `.bin`). +fn test_inbox_relpath(metadata: &BlobMetadata) -> Option { + use std::path::Component; + + let raw = metadata.name.as_deref()?; + if raw.is_empty() { + return None; + } + let mut safe = PathBuf::new(); + for comp in Path::new(raw).components() { + match comp { + Component::Normal(c) => safe.push(c), + Component::CurDir => {} + _ => return None, } } - format!("{distribution_id}.bin") + if safe.as_os_str().is_empty() { + None + } else { + Some(safe) + } } /// A `TestPeer` with attachment infrastructure wired: an `IrohFileDistribution` @@ -341,8 +343,8 @@ fn test_inbox_filename(metadata: &BlobMetadata, distribution_id: &str) -> String /// /// * `file_dist` — call `file_dist.distribute(...)` to push blobs from the /// TestPeer side (used by receive-side scenarios that need TestPeer to send). -/// * `inbox_dir` — check `inbox_dir.path()/{dist_id}/{filename}` to assert -/// that a CLI-originated send arrived at the TestPeer. +/// * `inbox_dir` — check `inbox_dir.path()/{relative_path}` (the mirrored +/// outbox layout) to assert that a CLI-originated send arrived at the TestPeer. pub struct TestAttachPeer { pub peer: TestPeer, pub file_dist: Arc, diff --git a/src/main.rs b/src/main.rs index 1307591..643eac2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -258,7 +258,9 @@ struct Args { /// spawns a background watcher that polls the synced /// `file_distributions` collection, fetches any blob whose /// distribution doc targets this node's iroh endpoint, and writes - /// the bytes to `{inbox}/{distribution_id}/{filename}`. Unset + /// the bytes to `{inbox}/{relative_path}`, mirroring the sender's + /// outbox layout (a sender-supplied name that is absolute or contains + /// `..` falls back to `{inbox}/{distribution_id}.bin`). Unset /// (default) disables receive-side delivery — peers still see the /// sender's distribution doc via Automerge sync but no auto-pull /// happens. Created if missing.