From 18ebdfd12403d624aad0dd3396fba09abc69d18c Mon Sep 17 00:00:00 2001 From: Kit Plummer Date: Fri, 19 Jun 2026 13:37:36 -0700 Subject: [PATCH 1/4] feat(attachments): richer transaction logging + receive-side post-write validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both sides now describe the transfer by filename, and the receiver validates the written file before publishing it: - inbox sink (receive): log at info with filename, bytes, blob_hash, and target path; validate the on-disk copy's size against the distribution's blob_size before the tmp->target rename. On mismatch, drop the partial and return Err so the receive watcher retries instead of publishing a short file. (Content is already content-hash-verified by iroh on fetch — this is the local-write completeness gate.) - outbox watcher (send): auto-distribute log now includes bytes + sha256 alongside filename + distribution_id. Surfaced while watching a live two-node run: the prior receive log carried only distribution_id/blob_hash/size — no filename, no post-write check. --- CHANGELOG.md | 14 ++++++++++++++ src/attachments/inbox.rs | 27 ++++++++++++++++++++++++--- src/attachments/outbox.rs | 9 ++++++++- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cb04ee..d939183 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- **Attachment transaction logging now describes the file, both sides.** + - Receive side: the inbox sink logs at info with the `filename`, `bytes`, + `blob_hash`, and target path — not just the distribution id — and performs + a **post-write size validation** (the on-disk copy must match the declared + `blob_size` before the tmp→target rename is published). On mismatch it drops + the partial and returns an error so the receive watcher retries, rather than + publishing a short file. (Content integrity is already guaranteed upstream + by iroh's content-addressed fetch; this is the local-write completeness + check.) + - Send side: the outbox watcher's auto-distribute log now includes `bytes` + and the `sha256` alongside the filename and distribution id. + ## [0.4.7] - 2026-06-19 ### Changed diff --git a/src/attachments/inbox.rs b/src/attachments/inbox.rs index 625fbbd..6acf430 100644 --- a/src/attachments/inbox.rs +++ b/src/attachments/inbox.rs @@ -29,7 +29,7 @@ use std::path::{Path, PathBuf}; use peat_mesh::storage::blob_traits::BlobMetadata; use peat_protocol::storage::{DistributionDocument, ReceiveSink}; -use tracing::debug; +use tracing::info; // Re-export the test fault seam from its new home in peat-protocol so // existing integration tests (`peat_node::attachments::inbox::{...}`) @@ -106,11 +106,32 @@ impl ReceiveSink for FilesystemInboxSink { // tokio::fs::copy reads + writes asynchronously; for v1 sizes // (256 MiB cap on max_file_bytes) the buffered copy is fine. tokio::fs::copy(blob_path, &tmp).await?; + + // Post-write validation: the on-disk copy must match the distribution's + // declared size before we publish it (the tmp→target rename). Content + // integrity is already guaranteed upstream — iroh verifies the blob + // against its content hash on fetch — so a size match confirms the + // local write is complete and untruncated. On mismatch, drop the tmp + // and return Err so the receive watcher retries on the next sweep + // rather than publishing a short file. + let written = tokio::fs::metadata(&tmp).await?.len(); + if written != doc.blob_size { + let _ = tokio::fs::remove_file(&tmp).await; + anyhow::bail!( + "inbox write size mismatch for {filename} (dist {}): wrote {written} bytes, \ + expected {} — leaving for retry", + doc.distribution_id, + doc.blob_size + ); + } tokio::fs::rename(&tmp, &target).await?; - debug!( + info!( distribution_id = %doc.distribution_id, + filename = %filename, + bytes = written, + blob_hash = %doc.blob_hash, target = %target.display(), - "attachment written to inbox" + "attachment received, validated, and written to inbox" ); Ok(()) } diff --git a/src/attachments/outbox.rs b/src/attachments/outbox.rs index 22d2bb8..bd1b9a1 100644 --- a/src/attachments/outbox.rs +++ b/src/attachments/outbox.rs @@ -189,6 +189,7 @@ pub async fn run(node: Arc, roots: HashMap, poll: continue; } }; + let sha_hex = hex::encode(&sha256); let req = build_request(root_name, &f.relative_path, f.size, sha256); match super::handlers::send_attachments(&node, req).await { Ok(resp) => { @@ -197,7 +198,13 @@ pub async fn run(node: Arc, roots: HashMap, poll: .first() .map(|h| h.distribution_id.as_str()) .unwrap_or(""); - info!(file = %f.relative_path, distribution_id = %dist, "outbox: auto-distributed"); + info!( + file = %f.relative_path, + bytes = f.size, + sha256 = %sha_hex, + distribution_id = %dist, + "outbox: auto-distributed attachment" + ); } Err(e) => { // Mark sent regardless so we don't re-attempt the same From b1e1b23fbfdec9ac25e85fff93bdaf95ce5d9969 Mon Sep 17 00:00:00 2001 From: Kit Plummer Date: Fri, 19 Jun 2026 14:04:12 -0700 Subject: [PATCH 2/4] feat(node): startup version banner + --print-config - Banner: first log line reports peat-node's version + the resolved peat-mesh/peat-protocol/peat-schema versions (extracted from Cargo.lock by build.rs via cargo:rustc-env). Confirms exactly which build + RC stack a container runs, from the top of the logs. - --print-config / PEAT_NODE_PRINT_CONFIG: opt-in dump of the full resolved configuration at startup, with shared_key + encryption_key redacted. Folded into the observability PR alongside the attachment transaction logging. --- CHANGELOG.md | 11 +++++++++++ build.rs | 36 ++++++++++++++++++++++++++++++++++++ src/main.rs | 31 +++++++++++++++++++++++++++++-- 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d939183..0971688 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Startup version banner.** The first log line now reports peat-node's own + version plus the resolved versions of the core dependency stack — + `peat-mesh`, `peat-protocol`, `peat-schema` (captured from `Cargo.lock` at + build time). Lets an operator confirm exactly which build + mesh/protocol RC + a container is running from the top of the logs. +- **`--print-config` / `PEAT_NODE_PRINT_CONFIG`.** Opt-in flag that logs the + full resolved configuration at startup (shared key + encryption key redacted) + — for diagnosing env/flag/Compose wiring. + ### Changed - **Attachment transaction logging now describes the file, both sides.** diff --git a/build.rs b/build.rs index ae8c08c..d6bad39 100644 --- a/build.rs +++ b/build.rs @@ -1,3 +1,24 @@ +use std::fs; + +/// Extract a dependency's resolved version from `Cargo.lock` so peat-node can +/// log its dependency stack at startup. Returns "unknown" if not found. +fn locked_version(lock: &str, crate_name: &str) -> String { + let needle = format!("name = \"{crate_name}\""); + if let Some(idx) = lock.find(&needle) { + // Within a `[[package]]` block the `version = "..."` line follows the + // `name = "..."` line, so the first one after the match is ours. + if let Some(line) = lock[idx..] + .lines() + .find(|l| l.trim_start().starts_with("version = ")) + { + if let Some(v) = line.split('"').nth(1) { + return v.to_string(); + } + } + } + "unknown".to_string() +} + fn main() -> Result<(), Box> { connectrpc_build::Config::new() .files(&["proto/sidecar.proto"]) @@ -5,6 +26,21 @@ fn main() -> Result<(), Box> { .include_file("_connectrpc.rs") .compile()?; + // Surface the resolved versions of the core dependency stack as + // compile-time env vars so peat-node can print them in its startup banner. + let lock = fs::read_to_string("Cargo.lock").unwrap_or_default(); + for (crate_name, env_var) in [ + ("peat-mesh", "PEAT_MESH_VERSION"), + ("peat-protocol", "PEAT_PROTOCOL_VERSION"), + ("peat-schema", "PEAT_SCHEMA_VERSION"), + ] { + println!( + "cargo:rustc-env={env_var}={}", + locked_version(&lock, crate_name) + ); + } + println!("cargo:rerun-if-changed=proto/sidecar.proto"); + println!("cargo:rerun-if-changed=Cargo.lock"); Ok(()) } diff --git a/src/main.rs b/src/main.rs index bce795e..1307591 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,9 +22,13 @@ use peat_node::pb::PeatSidecarExt; use peat_node::service::PeatSidecarService; use peat_node::watcher; -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] #[command(name = "peat-node", about = "Peat CRDT mesh node with Connect RPC API")] struct Args { + /// Log the full resolved configuration at startup (secrets redacted). + #[arg(long, env = "PEAT_NODE_PRINT_CONFIG", default_value = "false")] + print_config: bool, + /// Optional subcommand. With none, peat-node runs the mesh node (default). #[command(subcommand)] command: Option, @@ -327,7 +331,7 @@ struct Args { discovery_interval_secs: u64, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Debug, Clone)] enum Command { /// Print the deterministic iroh `EndpointId` for a `(shared-key, node-id)` /// pair, offline — no node boot, no network, no access to the peer. @@ -378,6 +382,18 @@ async fn main() -> anyhow::Result<()> { ) .init(); + // Version banner at the top of the logs: peat-node's own version plus the + // resolved versions of the core dependency stack (captured from Cargo.lock + // by build.rs). Lets an operator confirm exactly which build + mesh/protocol + // RC a container is running from the first log line. + info!( + version = env!("CARGO_PKG_VERSION"), + peat_mesh = env!("PEAT_MESH_VERSION"), + peat_protocol = env!("PEAT_PROTOCOL_VERSION"), + peat_schema = env!("PEAT_SCHEMA_VERSION"), + "peat-node version + dependency stack" + ); + // Treat any empty `PEAT_NODE_*` env var as unset before clap parses. // Compose/Helm routinely inject empty-string env vars for "disabled" // optional settings (e.g. `PEAT_NODE_ATTACHMENT_INBOX=""`); clap otherwise @@ -422,6 +438,17 @@ async fn main() -> anyhow::Result<()> { "starting peat-node" ); + // Full resolved-configuration dump (opt-in via --print-config / + // PEAT_NODE_PRINT_CONFIG). Secrets are redacted before logging. + if args.print_config { + let mut redacted = args.clone(); + redacted.shared_key = "".to_string(); + if redacted.encryption_key.is_some() { + redacted.encryption_key = Some("".to_string()); + } + info!("resolved configuration (PEAT_NODE_PRINT_CONFIG):\n{redacted:#?}"); + } + tokio::fs::create_dir_all(&args.data_dir).await?; // Deterministic iroh identity (peat-node#63 gap-4d): seed the keypair from From 1fededf85e4d125c67dd8c753422b4e89ec4f743 Mon Sep 17 00:00:00 2001 From: Kit Plummer Date: Fri, 19 Jun 2026 20:39:15 -0700 Subject: [PATCH 3/4] fix(attachments): write inbox files at original relative path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Received attachments were landing at {inbox}/{distribution_id}/{basename}, losing the original filename behind a UUID dir and flattening any subdirectories. Mirror the sender's outbox layout instead: a file dropped at outbox/sub/report.pdf now lands at inbox/sub/report.pdf with its name and subdirs intact, and re-delivery of the same path overwrites (latest-wins). - ingest: build_blob_metadata carries the full relative_path, not the basename - inbox: inbox_relpath replaces inbox_filename — resolves the sender-provided name to a path relative to the inbox root, preserving subdirectories - inbox: deliver writes at inbox/ (parent dirs created, tmp+ rename in the parent), dropping the distribution_id wrapper dir - already_delivered keys on the resolved target path Path-traversal hardening preserved and tightened: only Normal path components are accepted, so an absolute path or one containing '..' can never escape the inbox — such a name falls back to a flat {distribution_id}.bin at the root. New tests cover subdir preservation, traversal/absolute rejection, the flat fallback on a hostile name, and the size-mismatch bail. --- src/attachments/inbox.rs | 311 +++++++++++++++++++------------------- src/attachments/ingest.rs | 10 +- 2 files changed, 158 insertions(+), 163 deletions(-) diff --git a/src/attachments/inbox.rs b/src/attachments/inbox.rs index 6acf430..9a33e28 100644 --- a/src/attachments/inbox.rs +++ b/src/attachments/inbox.rs @@ -14,14 +14,19 @@ //! //! # Inbox layout //! -//! `{inbox_root}/{distribution_id}/{filename}` where `filename` comes -//! from `BlobMetadata.name` (set by the sender's `build_blob_metadata` -//! from `display_name` or the basename of `relative_path`), sanitized -//! to remove path separators. Distribution-ID namespacing avoids -//! collisions when two different distributions target the same logical -//! filename. Applications watching the inbox can correlate the -//! distribution_id back to the sender via -//! `GetAttachmentDistribution(distribution_id)`. +//! `{inbox_root}/{relative_path}` — the inbox mirrors the sender's outbox +//! layout, so a file dropped at `outbox/sub/report.pdf` lands at +//! `inbox/sub/report.pdf` with its original name and subdirectories intact. +//! The relative path comes from `BlobMetadata.name` (set by the sender's +//! `build_blob_metadata` from `display_name` or the full `relative_path`). +//! Re-delivery of the same path overwrites (latest-wins). +//! +//! Because the sender controls `BlobMetadata.name`, it is re-sanitised on +//! arrival ([`inbox_relpath`]): only `Normal` path components are accepted, so +//! an absolute path or one containing `..` can never escape the inbox — such a +//! name falls back to a flat `{distribution_id}.bin` at the inbox root. +//! Applications watching the inbox can still correlate a delivery back to the +//! sender via `GetAttachmentDistribution(distribution_id)`. #![allow(clippy::result_large_err)] @@ -55,53 +60,46 @@ impl FilesystemInboxSink { #[async_trait::async_trait] impl ReceiveSink for FilesystemInboxSink { - /// Filesystem-based "already delivered" gate. The - /// distribution-id-namespaced directory is the durable source of - /// truth: a long-running receiver that restarts doesn't re-fetch - /// and re-write every historical delivery. The "matching-size + - /// non-hidden" rule treats the existence of any regular file with - /// the declared blob_size in `{inbox}/{distribution_id}/` as proof - /// of prior delivery. Returns false on any I/O error so the caller - /// falls through to the fetch path and retries — better to - /// re-deliver than to silently skip a file that ought to land. + /// Filesystem-based "already delivered" gate, keyed on the file's mirrored + /// path (`{inbox}/{relative_path}`). Restart idempotency: a long-running + /// receiver that restarts doesn't re-fetch a delivery whose target already + /// holds a regular file of the declared `blob_size`. Size-only — the bytes + /// are content-verified by iroh on fetch. Returns false on any I/O error + /// (and on the rare same-path/same-size-but-different-content case) so the + /// caller re-delivers rather than silently skipping a file that should land. async fn already_delivered(&self, doc: &DistributionDocument) -> bool { - let dir = self.inbox_root.join(&doc.distribution_id); - if !dir.is_dir() { - return false; - } - let mut iter = match tokio::fs::read_dir(&dir).await { - Ok(i) => i, - Err(_) => return false, - }; - while let Ok(Some(entry)) = iter.next_entry().await { - let path = entry.path(); - // Skip our own in-flight `.{name}.partial` markers. - if path - .file_name() - .and_then(|n| n.to_str()) - .is_some_and(|s| s.starts_with('.')) - { - continue; - } - if let Ok(md) = entry.metadata().await { - if md.is_file() && md.len() == doc.blob_size { - return true; - } - } + let rel = inbox_relpath(&doc.blob_metadata) + .unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id))); + match tokio::fs::metadata(self.inbox_root.join(rel)).await { + Ok(md) => md.is_file() && md.len() == doc.blob_size, + Err(_) => false, } - false } - /// Copy the blob bytes into `{inbox}/{distribution_id}/{filename}` - /// via a tmp-file + rename pair so readers never see a partial - /// file. + /// Write the blob to `{inbox}/{relative_path}`, mirroring the sender's + /// layout (so `outbox/sub/demo.txt` lands at `inbox/sub/demo.txt`), via a + /// tmp-file + rename so readers never see a partial file. Re-delivery of the + /// same path overwrites (latest-wins). The relative path is re-sanitised + /// here ([`inbox_relpath`]) — the sender controls `blob_metadata.name`, so a + /// name that is absolute or contains `..` is rejected and the file lands at + /// a flat `.bin` at the inbox root instead of escaping it. async fn deliver(&self, doc: &DistributionDocument, blob_path: &Path) -> anyhow::Result<()> { - let dir = self.inbox_root.join(&doc.distribution_id); - tokio::fs::create_dir_all(&dir).await?; + let rel = inbox_relpath(&doc.blob_metadata) + .unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id))); + let target = self.inbox_root.join(&rel); - let filename = inbox_filename(&doc.blob_metadata, &doc.distribution_id); - let target = dir.join(&filename); - let tmp = dir.join(format!(".{filename}.partial")); + // Create the (possibly nested) parent dir, and stage the tmp file in it + // so the publishing rename is atomic on the same filesystem. + let parent = target + .parent() + .map(Path::to_path_buf) + .unwrap_or_else(|| self.inbox_root.clone()); + tokio::fs::create_dir_all(&parent).await?; + let fname = target + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or("blob"); + let tmp = parent.join(format!(".{fname}.partial")); // tokio::fs::copy reads + writes asynchronously; for v1 sizes // (256 MiB cap on max_file_bytes) the buffered copy is fine. @@ -118,8 +116,9 @@ impl ReceiveSink for FilesystemInboxSink { if written != doc.blob_size { let _ = tokio::fs::remove_file(&tmp).await; anyhow::bail!( - "inbox write size mismatch for {filename} (dist {}): wrote {written} bytes, \ + "inbox write size mismatch for {} (dist {}): wrote {written} bytes, \ expected {} — leaving for retry", + rel.display(), doc.distribution_id, doc.blob_size ); @@ -127,7 +126,7 @@ impl ReceiveSink for FilesystemInboxSink { tokio::fs::rename(&tmp, &target).await?; info!( distribution_id = %doc.distribution_id, - filename = %filename, + filename = %rel.display(), bytes = written, blob_hash = %doc.blob_hash, target = %target.display(), @@ -137,24 +136,38 @@ impl ReceiveSink for FilesystemInboxSink { } } -/// Derive a safe inbox filename from the blob metadata. Strips path -/// separators; falls back to `.bin` if metadata has -/// no name or the name sanitises to empty. -fn inbox_filename(metadata: &BlobMetadata, distribution_id: &str) -> String { - if let Some(raw) = metadata.name.as_ref() { - // Take only the last path component; strip leading dots so - // a malicious sender can't smuggle hidden files past an - // operator scanning ls -l on the inbox. - let last = std::path::Path::new(raw) - .file_name() - .and_then(|s| s.to_str()) - .map(|s| s.trim_start_matches('.')) - .filter(|s| !s.is_empty()); - if let Some(name) = last { - return name.to_string(); +/// Resolve the sender-provided `blob_metadata.name` into a safe path **relative +/// to the inbox root**, preserving subdirectories so the inbox mirrors the +/// sender's layout (`outbox/sub/demo.txt` → `inbox/sub/demo.txt`). +/// +/// Path-traversal guard: the sender controls `name`, so only `Normal` path +/// components are accepted. Any absolute path, `..`, root, or drive-prefix +/// component makes this return `None`, and the caller falls back to a flat +/// `.bin` at the inbox root — a malicious or malformed name +/// can never write outside the inbox. Returns `None` for a missing/empty name +/// or one that sanitises to nothing. +fn inbox_relpath(metadata: &BlobMetadata) -> Option { + use std::path::Component; + + let raw = metadata.name.as_deref()?; + if raw.is_empty() { + return None; + } + let mut safe = PathBuf::new(); + for comp in std::path::Path::new(raw).components() { + match comp { + Component::Normal(c) => safe.push(c), + Component::CurDir => {} // "." — ignore + // ".." (ParentDir), "/" (RootDir), or a Windows prefix — reject the + // whole name rather than try to repair it. + _ => return None, } } - format!("{distribution_id}.bin") + if safe.as_os_str().is_empty() { + None + } else { + Some(safe) + } } #[cfg(test)] @@ -164,16 +177,20 @@ mod tests { use peat_protocol::storage::{DistributionScope, TransferPriority}; use std::collections::HashMap; + fn meta(name: Option<&str>) -> BlobMetadata { + BlobMetadata { + name: name.map(String::from), + content_type: None, + custom: HashMap::new(), + } + } + fn doc_with(distribution_id: &str, blob_size: u64, name: Option<&str>) -> DistributionDocument { DistributionDocument { distribution_id: distribution_id.to_string(), blob_hash: "deadbeef".to_string(), blob_size, - blob_metadata: BlobMetadata { - name: name.map(|s| s.to_string()), - content_type: None, - custom: HashMap::new(), - }, + blob_metadata: meta(name), scope: DistributionScope::AllNodes, priority: TransferPriority::Normal, target_nodes: vec![], @@ -186,142 +203,120 @@ mod tests { } #[test] - fn inbox_filename_uses_metadata_name() { - let m = BlobMetadata { - name: Some("report.pdf".into()), - content_type: None, - custom: HashMap::new(), - }; - assert_eq!(inbox_filename(&m, "dist-X"), "report.pdf"); - } - - #[test] - fn inbox_filename_strips_path_components() { - let m = BlobMetadata { - name: Some("/etc/passwd".into()), - content_type: None, - custom: HashMap::new(), - }; - // Path::file_name on "/etc/passwd" returns "passwd" — the leading - // segments are stripped so a sender cannot use the metadata name - // to redirect writes outside the inbox subdirectory. - assert_eq!(inbox_filename(&m, "dist-X"), "passwd"); - } - - #[test] - fn inbox_filename_strips_leading_dot() { - let m = BlobMetadata { - name: Some(".bashrc".into()), - content_type: None, - custom: HashMap::new(), - }; - assert_eq!(inbox_filename(&m, "dist-X"), "bashrc"); + fn relpath_preserves_subdirs() { + assert_eq!( + inbox_relpath(&meta(Some("report.pdf"))), + Some(PathBuf::from("report.pdf")) + ); + assert_eq!( + inbox_relpath(&meta(Some("sub/dir/report.pdf"))), + Some(PathBuf::from("sub/dir/report.pdf")) + ); } #[test] - fn inbox_filename_falls_back_on_empty_metadata() { - let m = BlobMetadata { - name: None, - content_type: None, - custom: HashMap::new(), - }; - assert_eq!(inbox_filename(&m, "dist-X"), "dist-X.bin"); + fn relpath_rejects_traversal_and_absolute() { + // The sender controls `name`; these must never resolve to a path that + // could escape the inbox — reject them so the caller uses the fallback. + assert_eq!(inbox_relpath(&meta(Some("../../etc/passwd"))), None); + assert_eq!(inbox_relpath(&meta(Some("/etc/passwd"))), None); + assert_eq!(inbox_relpath(&meta(Some("a/../../b"))), None); } #[test] - fn inbox_filename_falls_back_on_dotfile_that_strips_to_empty() { - let m = BlobMetadata { - name: Some("...".into()), - content_type: None, - custom: HashMap::new(), - }; - // "..." has file_name "..." → strip leading dots → empty → - // fallback to distribution_id-based name. - assert_eq!(inbox_filename(&m, "dist-X"), "dist-X.bin"); + fn relpath_none_for_missing_or_empty() { + assert_eq!(inbox_relpath(&meta(None)), None); + assert_eq!(inbox_relpath(&meta(Some(""))), None); + assert_eq!(inbox_relpath(&meta(Some("./"))), None); } #[tokio::test] - async fn already_delivered_false_when_dir_absent() { + async fn already_delivered_false_when_absent() { let tmp = tempfile::tempdir().unwrap(); let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); assert!( !sink - .already_delivered(&doc_with("never-delivered", 100, None)) + .already_delivered(&doc_with("d", 100, Some("a.txt"))) .await ); } #[tokio::test] - async fn already_delivered_true_when_matching_size_present() { + async fn already_delivered_true_when_matching_size() { let tmp = tempfile::tempdir().unwrap(); - let dir = tmp.path().join("dist-X"); - tokio::fs::create_dir_all(&dir).await.unwrap(); - let payload = vec![0u8; 1024]; - tokio::fs::write(dir.join("got.bin"), &payload) + tokio::fs::write(tmp.path().join("a.txt"), vec![0u8; 1024]) .await .unwrap(); let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); assert!( - sink.already_delivered(&doc_with("dist-X", 1024, None)) + sink.already_delivered(&doc_with("d", 1024, Some("a.txt"))) .await ); } #[tokio::test] async fn already_delivered_false_when_size_differs() { - // PRD §Validation Rule 9 guarantees content+size match before - // ingest; this is the conservative case where the previous - // delivery exists but was for a different blob (e.g. - // distribution_id collision across formations or a manual - // file drop). Re-fetching with the new size is the safe move. let tmp = tempfile::tempdir().unwrap(); - let dir = tmp.path().join("dist-X"); - tokio::fs::create_dir_all(&dir).await.unwrap(); - tokio::fs::write(dir.join("got.bin"), b"shorter") + tokio::fs::write(tmp.path().join("a.txt"), b"short") .await .unwrap(); let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); assert!( !sink - .already_delivered(&doc_with("dist-X", 1024, None)) + .already_delivered(&doc_with("d", 1024, Some("a.txt"))) .await ); } #[tokio::test] - async fn already_delivered_ignores_partial_marker() { - // `.{filename}.partial` is the in-flight tmp file the sink - // writes before atomic rename. If a crash leaves one behind - // it must NOT count as a successful delivery. - let tmp = tempfile::tempdir().unwrap(); - let dir = tmp.path().join("dist-X"); - tokio::fs::create_dir_all(&dir).await.unwrap(); - tokio::fs::write(dir.join(".got.bin.partial"), vec![0u8; 1024]) - .await - .unwrap(); - let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); - assert!( - !sink - .already_delivered(&doc_with("dist-X", 1024, None)) - .await - ); - } - - #[tokio::test] - async fn deliver_writes_file_and_is_idempotent() { + async fn deliver_mirrors_relative_path_and_is_idempotent() { let tmp = tempfile::tempdir().unwrap(); let src = tempfile::NamedTempFile::new().unwrap(); std::fs::write(src.path(), b"hello world").unwrap(); let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); - let doc = doc_with("dist-Y", 11, Some("greeting.txt")); + let doc = doc_with("dist-Y", 11, Some("sub/dir/greeting.txt")); sink.deliver(&doc, src.path()).await.unwrap(); - let landed = tmp.path().join("dist-Y").join("greeting.txt"); + // Mirrors the sender's relative path — NOT a {distribution_id} dir. + let landed = tmp.path().join("sub").join("dir").join("greeting.txt"); assert_eq!(std::fs::read(&landed).unwrap(), b"hello world"); assert!(sink.already_delivered(&doc).await); - // Re-deliver: atomic rename overwrites, no partial left behind. + // Re-deliver overwrites (latest-wins); no partial left behind. sink.deliver(&doc, src.path()).await.unwrap(); assert_eq!(std::fs::read(&landed).unwrap(), b"hello world"); + assert!(!tmp + .path() + .join("sub") + .join("dir") + .join(".greeting.txt.partial") + .exists()); + } + + #[tokio::test] + async fn deliver_traversal_name_stays_inside_inbox() { + let tmp = tempfile::tempdir().unwrap(); + let src = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(src.path(), b"x").unwrap(); + let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); + // A hostile name resolves to the flat fallback inside the inbox, never + // outside it. + let doc = doc_with("dist-evil", 1, Some("../../../../tmp/pwned")); + sink.deliver(&doc, src.path()).await.unwrap(); + assert!(tmp.path().join("dist-evil.bin").is_file()); + } + + #[tokio::test] + async fn deliver_size_mismatch_bails_without_publishing() { + let tmp = tempfile::tempdir().unwrap(); + let src = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(src.path(), b"hello world").unwrap(); // 11 bytes + let sink = FilesystemInboxSink::new(tmp.path().to_path_buf()); + let doc = doc_with("dist-Z", 99, Some("x.txt")); // declares 99 + assert!(sink.deliver(&doc, src.path()).await.is_err()); + assert!( + !tmp.path().join("x.txt").exists(), + "a short file must not be published" + ); } } diff --git a/src/attachments/ingest.rs b/src/attachments/ingest.rs index 49c690f..12a7d89 100644 --- a/src/attachments/ingest.rs +++ b/src/attachments/ingest.rs @@ -180,14 +180,14 @@ async fn ingest_file( } fn build_blob_metadata(file: &ValidatedFile) -> BlobMetadata { + // Carry the FULL relative path (forward-slashed), not just the basename, so + // the receiver can mirror the sender's layout (inbox/) + // instead of flattening every file to its basename. The receiver + // re-sanitises this against path traversal before use. `display_name` still + // overrides when a caller set one explicitly. let name = file .display_name .clone() - .or_else(|| { - Path::new(&file.relative_path) - .file_name() - .and_then(|n| n.to_str().map(String::from)) - }) .or_else(|| Some(file.relative_path.clone())); BlobMetadata { From 409a54676d2b12fbd47a02d14ebcf1e2e1cac905 Mon Sep 17 00:00:00 2001 From: Kit Plummer Date: Fri, 19 Jun 2026 20:55:52 -0700 Subject: [PATCH 4/4] test(attachments): assert mirrored inbox layout (no distribution_id dir) The delivery tests still expected the old {inbox}/{distribution_id}/{name} layout and broke once attachments started landing at inbox/. - attachment-delivery-compose.sh: poll inbox/payload.bin, not inbox/$DIST/... - attachments_e2e_test: await_inbox_file walks the inbox tree by content (new find_matching_file helper) instead of keying on the distribution_id; the NodeList negative-case scans C's whole inbox for the payload bytes Verified: all 4 two-node e2e tests pass, and the compose delivery script passes against the fix image (2 MiB payload, byte-identical, ~2s). --- test/attachment-delivery-compose.sh | 5 +- tests/attachments_e2e_test.rs | 158 +++++++++++----------------- 2 files changed, 64 insertions(+), 99 deletions(-) diff --git a/test/attachment-delivery-compose.sh b/test/attachment-delivery-compose.sh index 075c14d..6cfad32 100755 --- a/test/attachment-delivery-compose.sh +++ b/test/attachment-delivery-compose.sh @@ -125,8 +125,11 @@ DIST="$(echo "$RESP" | jq -r '.handles[0].distributionId // empty')" log "distribution_id=$DIST" # ---- The assertion that matters: byte-identical file on the receiver -------- +# The inbox mirrors the sender's outbox layout: a file sent with +# relativePath="payload.bin" lands at inbox/payload.bin (original name, no +# distribution_id wrapper dir). log "Polling receiver inbox for delivered bytes (up to 40s)" -RECV="$WORK/inbox/$DIST/payload.bin" +RECV="$WORK/inbox/payload.bin" for i in $(seq 1 40); do if [ -f "$RECV" ]; then RSHA="$(openssl dgst -sha256 -binary "$RECV" | base64)" diff --git a/tests/attachments_e2e_test.rs b/tests/attachments_e2e_test.rs index f34edce..dcaf415 100644 --- a/tests/attachments_e2e_test.rs +++ b/tests/attachments_e2e_test.rs @@ -159,54 +159,63 @@ fn b64(bytes: &[u8]) -> String { base64::engine::general_purpose::STANDARD.encode(bytes) } -/// Poll the inbox until a file with `expected` bytes appears under -/// `{inbox}/{distribution_id}/...`. Returns the path. Callers pass a 60s -/// deadline: successful delivery lands in under 3 seconds (iroh handshake + one -/// watcher tick), but a CPU-contended CI runner spinning up multiple two-node -/// iroh meshes needs generous headroom (a 30s budget flaked — peat-node SKILL -/// gotcha on the `iroh_two_node` serial tests). A timeout here is a real -/// delivery failure, not slowness. -async fn await_inbox_file( - inbox: &std::path::Path, - distribution_id: &str, - expected: &[u8], - deadline: Duration, -) -> PathBuf { +/// Poll the inbox tree until a file with `expected` bytes appears anywhere under +/// `inbox`. The inbox mirrors the sender's outbox layout (`inbox/`, +/// e.g. `inbox/delivery.bin` or `inbox/sub/x.bin`) — no distribution_id wrapper +/// dir — so we search recursively rather than keying on the distribution_id. +/// Returns the path. Callers pass a 60s deadline: successful delivery lands in +/// under 3 seconds (iroh handshake + one watcher tick), but a CPU-contended CI +/// runner spinning up multiple two-node iroh meshes needs generous headroom (a +/// 30s budget flaked — peat-node SKILL gotcha on the `iroh_two_node` serial +/// tests). A timeout here is a real delivery failure, not slowness. +async fn await_inbox_file(inbox: &std::path::Path, expected: &[u8], deadline: Duration) -> PathBuf { let deadline_at = Instant::now() + deadline; - let dist_dir = inbox.join(distribution_id); while Instant::now() < deadline_at { - if dist_dir.is_dir() { - // Look for any file in the distribution-id subdirectory. - if let Ok(mut iter) = tokio::fs::read_dir(&dist_dir).await { - while let Ok(Some(entry)) = iter.next_entry().await { - let path = entry.path(); - // Skip our own in-flight tmp marker - if path - .file_name() - .and_then(|n| n.to_str()) - .is_some_and(|s| s.starts_with('.')) - { - continue; - } - if path.is_file() { - if let Ok(actual) = tokio::fs::read(&path).await { - if actual == expected { - return path; - } - } - } - } - } + if let Some(found) = find_matching_file(inbox, expected) { + return found; } tokio::time::sleep(Duration::from_millis(100)).await; } panic!( - "no file with the expected content appeared in {} within {:?}", - dist_dir.display(), + "no file with the expected content appeared under {} within {:?}", + inbox.display(), deadline ); } +/// Recursively search `root` for a regular file whose bytes equal `expected`, +/// skipping in-flight `.partial` tmp markers (dotfiles). Synchronous `std::fs` +/// is fine in this test helper. +fn find_matching_file(root: &std::path::Path, expected: &[u8]) -> Option { + let mut stack = vec![root.to_path_buf()]; + while let Some(dir) = stack.pop() { + let Ok(entries) = std::fs::read_dir(&dir) else { + continue; + }; + for entry in entries.flatten() { + let path = entry.path(); + if path + .file_name() + .and_then(|n| n.to_str()) + .is_some_and(|s| s.starts_with('.')) + { + continue; // in-flight tmp marker + } + let Ok(ft) = entry.file_type() else { continue }; + if ft.is_dir() { + stack.push(path); + } else if ft.is_file() { + if let Ok(actual) = std::fs::read(&path) { + if actual == expected { + return Some(path); + } + } + } + } + } + None +} + /// Boot A + B, peer them, send a real file from A, assert the *same /// bytes* land on B's filesystem inbox under the distribution_id /// subdirectory. This is the missing acceptance: prior to this test, @@ -317,13 +326,7 @@ async fn end_to_end_attachment_delivery_two_nodes() { // The assertion that matters: B's filesystem inbox eventually // contains a file with the same bytes. 30-second timeout gives // iroh + watcher headroom but successful runs land in <3 seconds. - let received_path = await_inbox_file( - &b.inbox_path, - &distribution_id, - &payload, - Duration::from_secs(60), - ) - .await; + let received_path = await_inbox_file(&b.inbox_path, &payload, Duration::from_secs(60)).await; // Per-byte and sha256 cross-check on the received file. let received = tokio::fs::read(&received_path).await.unwrap(); @@ -453,19 +456,14 @@ async fn node_list_scope_only_delivers_to_listed_nodes() { }), ) .await; - let distribution_id = resp["handles"][0]["distributionId"] + // Assert the RPC returned a distribution_id; the inbox no longer namespaces + // by it, so we don't need the value itself for the on-disk assertions. + resp["handles"][0]["distributionId"] .as_str() - .expect("SendAttachments must return a distribution_id") - .to_string(); + .expect("SendAttachments must return a distribution_id"); // B receives. - let b_path = await_inbox_file( - &b.inbox_path, - &distribution_id, - &payload, - Duration::from_secs(60), - ) - .await; + let b_path = await_inbox_file(&b.inbox_path, &payload, Duration::from_secs(60)).await; let b_bytes = tokio::fs::read(&b_path).await.unwrap(); assert_eq!( sha256_of(&b_bytes), @@ -478,35 +476,11 @@ async fn node_list_scope_only_delivers_to_listed_nodes() { // on the synced doc if it were going to. tokio::time::sleep(Duration::from_secs(3)).await; - // C does NOT receive. - let c_dist_dir = c.inbox_path.join(&distribution_id); - let c_has_payload = if c_dist_dir.is_dir() { - // Allow the directory to exist but assert no non-hidden file of - // the payload's size lives there. The watcher's `already_delivered` - // gate uses size-matching, and a sender-controlled empty dir - // wouldn't be a delivery anyway. Inspecting size is more direct - // than relying on the absence of the dir itself. - let mut iter = tokio::fs::read_dir(&c_dist_dir).await.unwrap(); - let mut found_payload = false; - while let Ok(Some(entry)) = iter.next_entry().await { - let p = entry.path(); - if p.file_name() - .and_then(|n| n.to_str()) - .is_some_and(|s| s.starts_with('.')) - { - continue; - } - if let Ok(md) = entry.metadata().await { - if md.is_file() && md.len() == payload.len() as u64 { - found_payload = true; - break; - } - } - } - found_payload - } else { - false - }; + // C does NOT receive. Scan C's entire inbox tree (the inbox mirrors the + // sender's relative path, so there's no distribution_id dir to look under) + // for the exact payload bytes — `find_matching_file` skips in-flight tmp + // markers, and a byte-exact match is a stronger signal than size alone. + let c_has_payload = find_matching_file(&c.inbox_path, &payload).is_some(); assert!( !c_has_payload, "C must NOT receive the file — NodeListScope was [{b_short}] only. \ @@ -619,13 +593,7 @@ async fn receiver_writes_node_status_into_distribution_doc() { // First, wait for byte-on-disk delivery — that's the sync point // after which both node-status writes must have run. - let _ = await_inbox_file( - &b.inbox_path, - &distribution_id, - &payload, - Duration::from_secs(60), - ) - .await; + let _ = await_inbox_file(&b.inbox_path, &payload, Duration::from_secs(60)).await; // Poll the receiver's local distribution doc until `node_statuses` // for this node reaches `Completed`, with a 15-second timeout. The @@ -771,13 +739,7 @@ async fn sender_get_attachment_distribution_reaches_completed_two_nodes() { // Sync point: bytes land on B's inbox, after which B's // Completed node-status write propagates back to A. - let _ = await_inbox_file( - &b.inbox_path, - &distribution_id, - &payload, - Duration::from_secs(60), - ) - .await; + let _ = await_inbox_file(&b.inbox_path, &payload, Duration::from_secs(60)).await; // Poll A's unary GetAttachmentDistribution until it reports // COMPLETED. 30s ceiling: byte delivery already happened, so this