Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/peat-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ serial_test = "3"
# Unique per-run formation id in the mDNS e2e test so concurrent runs on
# the shared self-hosted runner can't discover each other over loopback.
uuid = { version = "1", features = ["v4"] }
# Builds a DistributionDocument fixture (`started_at: DateTime<Utc>`) for the
# InboxSink layout regression test. Matches the version peat-mesh resolves.
chrono = "0.4"
243 changes: 188 additions & 55 deletions crates/peat-cli/src/cli/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! distribution from the synced Automerge store.
//! * `peat attach watch [--inbox <dir>]` — start a receive watcher that polls
//! for incoming distribution documents targeting this node and writes each
//! blob to `<inbox>/<distribution-id>/<filename>`. Runs until `SIGINT`
//! blob to `<inbox>/<relative-path>` (mirroring the outbox layout). Runs until `SIGINT`
//! unless `--dist-id` is given, in which case it exits once that specific
//! distribution has been delivered.

Expand Down Expand Up @@ -314,9 +314,12 @@ fn parse_priority(s: &str) -> Result<TransferPriority, CliError> {
}
}

/// Inbox receive sink: writes each delivered blob to
/// `{inbox_root}/{distribution_id}/{filename}` via a tmp-then-rename pair.
/// Mirrors the `FilesystemInboxSink` in `peat-node::attachments::inbox`.
/// Inbox receive sink: writes each delivered blob to `{inbox_root}/{relative_path}`,
/// mirroring the sender's outbox layout (so `outbox/sub/report.pdf` lands at
/// `inbox/sub/report.pdf`) via a tmp-then-rename pair. Re-delivery of the same
/// path overwrites (latest-wins). Faithfully mirrors the `FilesystemInboxSink` in
/// `peat-node::attachments::inbox` — including the path-traversal guard and the
/// post-write size validation; keep the two in lockstep.
///
/// When constructed with [`InboxSink::new_with_notify`], fires the provided
/// `Notify` after the target distribution's blob is successfully renamed into
Expand Down Expand Up @@ -346,40 +349,54 @@ impl InboxSink {

#[async_trait]
impl ReceiveSink for InboxSink {
/// "Already delivered" gate keyed on the mirrored path
/// (`{inbox}/{relative_path}`): a restarted watcher doesn't re-fetch a
/// delivery whose target already holds a regular file of the declared size.
async fn already_delivered(&self, doc: &DistributionDocument) -> bool {
let dir = self.inbox_root.join(&doc.distribution_id);
if !dir.is_dir() {
return false;
let rel = inbox_relpath(&doc.blob_metadata)
.unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id)));
match tokio::fs::metadata(self.inbox_root.join(rel)).await {
Ok(md) => md.is_file() && md.len() == doc.blob_size,
Err(_) => false,
}
let mut iter = match tokio::fs::read_dir(&dir).await {
Ok(i) => i,
Err(_) => return false,
};
while let Ok(Some(entry)) = iter.next_entry().await {
if entry
.path()
.file_name()
.and_then(|n| n.to_str())
.is_some_and(|s| s.starts_with('.'))
{
continue;
}
if let Ok(md) = entry.metadata().await {
if md.is_file() && md.len() == doc.blob_size {
return true;
}
}
}
false
}

async fn deliver(&self, doc: &DistributionDocument, blob_path: &Path) -> anyhow::Result<()> {
let dir = self.inbox_root.join(&doc.distribution_id);
tokio::fs::create_dir_all(&dir).await?;
let filename = inbox_filename(&doc.blob_metadata, &doc.distribution_id);
let target = dir.join(&filename);
let tmp = dir.join(format!(".{filename}.partial"));
let rel = inbox_relpath(&doc.blob_metadata)
.unwrap_or_else(|| PathBuf::from(format!("{}.bin", doc.distribution_id)));
let target = self.inbox_root.join(&rel);

// Create the (possibly nested) parent dir and stage the tmp file in it
// so the publishing rename is atomic on the same filesystem.
let parent = target
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| self.inbox_root.clone());
tokio::fs::create_dir_all(&parent).await?;
let fname = target
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("blob");
let tmp = parent.join(format!(".{fname}.partial"));

tokio::fs::copy(blob_path, &tmp).await?;

// Post-write size validation before the publishing rename — matches
// FilesystemInboxSink. iroh already content-verifies the blob on fetch,
// so a size match confirms a complete, untruncated local write. On
// mismatch, drop the tmp and bail so the watcher retries rather than
// publishing a short file.
let written = tokio::fs::metadata(&tmp).await?.len();
if written != doc.blob_size {
let _ = tokio::fs::remove_file(&tmp).await;
anyhow::bail!(
"inbox write size mismatch for {} (dist {}): wrote {written} bytes, \
expected {} — leaving for retry",
rel.display(),
doc.distribution_id,
doc.blob_size
);
}
tokio::fs::rename(&tmp, &target).await?;
if let Some((target_id, notify)) = &self.delivery_signal {
if doc.distribution_id == *target_id {
Expand All @@ -390,22 +407,39 @@ impl ReceiveSink for InboxSink {
}
}

/// Derive a safe inbox filename from blob metadata. Strips path separators
/// and leading dots so a sender cannot redirect writes outside the inbox
/// subdirectory. Falls back to `<distribution_id>.bin` when metadata has
/// no usable name.
fn inbox_filename(metadata: &BlobMetadata, distribution_id: &str) -> String {
if let Some(raw) = metadata.name.as_ref() {
let last = Path::new(raw)
.file_name()
.and_then(|s| s.to_str())
.map(|s| s.trim_start_matches('.'))
.filter(|s| !s.is_empty());
if let Some(name) = last {
return name.to_string();
/// Resolve the sender-provided `blob_metadata.name` into a safe path **relative
/// to the inbox root**, preserving subdirectories so the inbox mirrors the
/// sender's layout (`outbox/sub/demo.txt` → `inbox/sub/demo.txt`). Faithfully
/// mirrors `peat-node::attachments::inbox::inbox_relpath`.
///
/// Path-traversal guard: the sender controls `name`, so only `Normal` path
/// components are accepted. Any absolute path, `..`, root, or drive-prefix
/// component returns `None`, and the caller falls back to a flat
/// `<distribution_id>.bin` at the inbox root — a malicious or malformed name
/// can never write outside the inbox. Returns `None` for a missing/empty name
/// or one that sanitises to nothing.
fn inbox_relpath(metadata: &BlobMetadata) -> Option<PathBuf> {
use std::path::Component;

let raw = metadata.name.as_deref()?;
if raw.is_empty() {
return None;
}
let mut safe = PathBuf::new();
for comp in Path::new(raw).components() {
match comp {
Component::Normal(c) => safe.push(c),
Component::CurDir => {} // "." — ignore
// ".." (ParentDir), "/" (RootDir), or a Windows prefix — reject the
// whole name rather than try to repair it.
_ => return None,
}
}
format!("{distribution_id}.bin")
if safe.as_os_str().is_empty() {
None
} else {
Some(safe)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -472,25 +506,124 @@ mod tests {
assert_eq!(parse_priority("unknown").unwrap_err().exit_code(), 4);
}

fn doc_with(distribution_id: &str, blob_size: u64, name: Option<&str>) -> DistributionDocument {
DistributionDocument {
distribution_id: distribution_id.to_string(),
blob_hash: "deadbeef".to_string(),
blob_size,
blob_metadata: BlobMetadata {
name: name.map(String::from),
content_type: None,
custom: std::collections::HashMap::new(),
},
scope: DistributionScope::AllNodes,
priority: TransferPriority::Normal,
target_nodes: vec![],
started_at: chrono::Utc::now(),
status: "distributing".to_string(),
cancelled_at: None,
collection: None,
node_statuses: std::collections::HashMap::new(),
}
}

#[test]
fn inbox_filename_uses_name() {
let meta = BlobMetadata::with_name("report.pdf".to_string());
assert_eq!(inbox_filename(&meta, "dist-X"), "report.pdf");
fn inbox_relpath_preserves_subdirs() {
assert_eq!(
inbox_relpath(&BlobMetadata::with_name("report.pdf".to_string())),
Some(PathBuf::from("report.pdf"))
);
assert_eq!(
inbox_relpath(&BlobMetadata::with_name("sub/dir/report.pdf".to_string())),
Some(PathBuf::from("sub/dir/report.pdf"))
);
}

#[test]
fn inbox_filename_strips_path() {
let meta = BlobMetadata::with_name("/etc/passwd".to_string());
assert_eq!(inbox_filename(&meta, "dist-X"), "passwd");
fn inbox_relpath_rejects_traversal_and_absolute() {
// Sender-controlled name must never escape the inbox — reject so the
// caller uses the `<distribution_id>.bin` fallback.
assert_eq!(
inbox_relpath(&BlobMetadata::with_name("../../etc/passwd".to_string())),
None
);
assert_eq!(
inbox_relpath(&BlobMetadata::with_name("/etc/passwd".to_string())),
None
);
assert_eq!(
inbox_relpath(&BlobMetadata::with_name("a/../../b".to_string())),
None
);
}

#[test]
fn inbox_filename_fallback() {
let meta = BlobMetadata {
fn inbox_relpath_none_for_missing_or_empty() {
let none = BlobMetadata {
name: None,
content_type: None,
custom: std::collections::HashMap::new(),
};
assert_eq!(inbox_filename(&meta, "dist-X"), "dist-X.bin");
assert_eq!(inbox_relpath(&none), None);
assert_eq!(inbox_relpath(&BlobMetadata::with_name(String::new())), None);
}

/// Regression guard for #173 / the peat-cli↔peat-node inbox-layout
/// divergence: `deliver()` must mirror the sender's relative path, NOT nest
/// the file under a `{distribution_id}/` directory like the pre-#173 sink.
#[tokio::test]
async fn deliver_mirrors_relative_path_not_distribution_id_dir() {
let inbox = tempfile::tempdir().unwrap();
let src = tempfile::NamedTempFile::new().unwrap();
std::fs::write(src.path(), b"hello world").unwrap();
let sink = InboxSink::new(inbox.path().to_path_buf());
let doc = doc_with("dist-Y", 11, Some("sub/dir/greeting.txt"));

sink.deliver(&doc, src.path()).await.unwrap();

let landed = inbox.path().join("sub").join("dir").join("greeting.txt");
assert_eq!(std::fs::read(&landed).unwrap(), b"hello world");
// The old layout nested under {distribution_id}/ — assert it does NOT.
assert!(
!inbox.path().join("dist-Y").exists(),
"delivery must not nest under a distribution_id directory"
);
assert!(sink.already_delivered(&doc).await);

// Re-deliver overwrites (latest-wins); no partial left behind.
sink.deliver(&doc, src.path()).await.unwrap();
assert_eq!(std::fs::read(&landed).unwrap(), b"hello world");
assert!(!inbox
.path()
.join("sub")
.join("dir")
.join(".greeting.txt.partial")
.exists());
}

#[tokio::test]
async fn deliver_traversal_name_falls_back_inside_inbox() {
let inbox = tempfile::tempdir().unwrap();
let src = tempfile::NamedTempFile::new().unwrap();
std::fs::write(src.path(), b"x").unwrap();
let sink = InboxSink::new(inbox.path().to_path_buf());
// A hostile name resolves to the flat fallback inside the inbox.
let doc = doc_with("dist-evil", 1, Some("../../../../tmp/pwned"));
sink.deliver(&doc, src.path()).await.unwrap();
assert!(inbox.path().join("dist-evil.bin").is_file());
}

#[tokio::test]
async fn deliver_size_mismatch_bails_without_publishing() {
let inbox = tempfile::tempdir().unwrap();
let src = tempfile::NamedTempFile::new().unwrap();
std::fs::write(src.path(), b"hello world").unwrap(); // 11 bytes
let sink = InboxSink::new(inbox.path().to_path_buf());
let doc = doc_with("dist-Z", 99, Some("x.txt")); // declares 99
assert!(sink.deliver(&doc, src.path()).await.is_err());
assert!(
!inbox.path().join("x.txt").exists(),
"a short file must not be published"
);
}
}
51 changes: 24 additions & 27 deletions crates/peat-cli/tests/e2e/scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,24 +1190,22 @@ async fn attach_send_delivers_file_to_peer_inbox() {
);
assert_eq!(v["completed"], 1, "expected 1 completed target: {stdout}");

// The receive watcher delivers to {inbox}/{dist_id}/{filename}.
let dist_dir = attach_peer.inbox_dir.path().join(&dist_id);
// The receive watcher mirrors the sender's outbox layout: a file sent as
// `payload.txt` lands at {inbox}/payload.txt — NOT {inbox}/{dist_id}/...
let landed = attach_peer.inbox_dir.path().join("payload.txt");
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if dist_dir.is_dir() {
let entries: Vec<_> = std::fs::read_dir(&dist_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| !e.file_name().to_str().unwrap_or("").starts_with('.'))
.collect();
if let Some(entry) = entries.first() {
let content = std::fs::read(entry.path()).unwrap();
assert_eq!(
content, b"attach e2e test payload",
"delivered file content mismatch"
);
break;
}
if landed.is_file() {
let content = std::fs::read(&landed).unwrap();
assert_eq!(
content, b"attach e2e test payload",
"delivered file content mismatch"
);
assert!(
!attach_peer.inbox_dir.path().join(&dist_id).exists(),
"delivery must not nest under a distribution_id directory; dist_id={dist_id}"
);
break;
}
assert!(
std::time::Instant::now() < deadline,
Expand Down Expand Up @@ -1286,19 +1284,18 @@ async fn attach_watch_receives_file_from_peer() {
let (_token, handle) = attach_peer.distribute_file(&src, "from_peer.bin").await;
let dist_id = handle.distribution_id.clone();

// Poll CLI's inbox until the file appears.
let inbox_dist_dir = inbox_dir.path().join(&dist_id);
// Poll CLI's inbox until the file appears. The CLI sink mirrors the
// sender's layout: `from_peer.bin` lands at {inbox}/from_peer.bin — NOT
// nested under {inbox}/{dist_id}/.
let landed = inbox_dir.path().join("from_peer.bin");
let deadline = std::time::Instant::now() + Duration::from_secs(15);
let content = loop {
if inbox_dist_dir.is_dir() {
let entries: Vec<_> = std::fs::read_dir(&inbox_dist_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| !e.file_name().to_str().unwrap_or("").starts_with('.'))
.collect();
if let Some(entry) = entries.first() {
break std::fs::read(entry.path()).unwrap();
}
if landed.is_file() {
assert!(
!inbox_dir.path().join(&dist_id).exists(),
"delivery must not nest under a distribution_id directory; dist_id={dist_id}"
);
break std::fs::read(&landed).unwrap();
}
assert!(
std::time::Instant::now() < deadline,
Expand Down
Loading
Loading