From 18daf0cfe5a8b713e946bb3007c8dfb29115ffd8 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 28 Feb 2025 15:04:13 +0300 Subject: [PATCH 01/11] WIP on mac persistency problem (flock() doesn't always work on mac tmpfs, so need other bsd-specific mechanism) --- commons/zenoh-shm/src/shm/unix.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index c91ad45bb..923534da9 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -48,7 +48,12 @@ impl SegmentImpl { let fd = { let id = Self::id_str(id); let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; + #[cfg(any(bsd, target_os = "redox"))] + let flags = flags | OFlag::O_SHLOCK; + + // todo: these flags probably can be exposed to the config let mode = Mode::S_IRUSR | Mode::S_IWUSR; + tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); match shm_open(id.as_str(), flags, mode) { Ok(v) => v, @@ -202,6 +207,11 @@ impl Drop for SegmentImpl { #[cfg(target_os = "linux")] Self::unlink_if_unique(self.id, &self.fd); + //{ + // let id = Self::id_str(self.id); + // let fd = open("myfile.txt", O_RDWR | O_CREAT | O_EXLOCK | O_NONBLOCK, 0666); + //} + #[cfg(not(target_os = "linux"))] { let id = Self::id_str(self.id); From 586ac85ef5cc50de127d1698d9a73d64e512d2c0 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 28 Feb 2025 17:50:48 +0300 Subject: [PATCH 02/11] Make BSD and non-BSD implementation for shared memory cleanup --- commons/zenoh-shm/src/shm/unix.rs | 86 ++++++++++++++++++++++--------- commons/zenoh-shm/tests/shm.rs | 4 -- 2 files changed, 61 insertions(+), 29 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 923534da9..69b7db6c4 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -12,6 +12,8 @@ // ZettaScale Zenoh Team, // +#[cfg(any(bsd, target_os = "redox"))] +use std::mem::ManuallyDrop; use std::{ ffi::c_void, num::NonZeroUsize, @@ -19,8 +21,8 @@ use std::{ ptr::NonNull, }; -// todo: flock() doesn't work on Mac in some cases, but we can fix it -#[cfg(target_os = "linux")] +// we use flock() on non-BSD systems +#[cfg(not(any(bsd, target_os = "redox")))] use advisory_lock::{AdvisoryFileLock, FileLockMode}; use nix::{ fcntl::OFlag, @@ -34,8 +36,10 @@ use nix::{ use super::{SegmentCreateError, SegmentID, SegmentOpenError, ShmCreateResult, ShmOpenResult}; pub struct SegmentImpl { - #[cfg(target_os = "linux")] + #[cfg(not(any(bsd, target_os = "redox")))] fd: OwnedFd, + #[cfg(any(bsd, target_os = "redox"))] + fd: ManuallyDrop, len: NonZeroUsize, data_ptr: NonNull, id: ID, @@ -48,22 +52,26 @@ impl SegmentImpl { let fd = { let id = Self::id_str(id); let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; + + // open shm file with shared lock (BSD feature) #[cfg(any(bsd, target_os = "redox"))] - let flags = flags | OFlag::O_SHLOCK; - + let flags = flags | OFlag::O_SHLOCK | OFlag::O_NONBLOCK; + // todo: these flags probably can be exposed to the config let mode = Mode::S_IRUSR | Mode::S_IWUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); match shm_open(id.as_str(), flags, mode) { Ok(v) => v, + #[cfg(any(bsd, target_os = "redox"))] + Err(nix::Error::EWOULDBLOCK) => return Err(SegmentCreateError::SegmentExists), Err(nix::Error::EEXIST) => return Err(SegmentCreateError::SegmentExists), Err(e) => return Err(SegmentCreateError::OsError(e as u32)), } }; - // todo: flock() doesn't work on Mac in some cases, but we can fix it - #[cfg(target_os = "linux")] + // we use flock() on non-BSD systems + #[cfg(not(any(bsd, target_os = "redox")))] // put shared advisory lock on shm fd fd.as_raw_fd() .try_lock(FileLockMode::Shared) @@ -88,7 +96,6 @@ impl SegmentImpl { let data_ptr = Self::map(len, &fd).map_err(|e| SegmentCreateError::OsError(e as _))?; Ok(Self { - #[cfg(target_os = "linux")] fd, len, data_ptr, @@ -101,13 +108,21 @@ impl SegmentImpl { let fd = { let id = Self::id_str(id); let flags = OFlag::O_RDWR; + #[cfg(any(bsd, target_os = "redox"))] + let flags = flags | OFlag::O_SHLOCK | OFlag::O_NONBLOCK; let mode = Mode::S_IRUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); - shm_open(id.as_str(), flags, mode).map_err(|e| SegmentOpenError::OsError(e as u32))? + + match shm_open(id.as_str(), flags, mode) { + Ok(v) => v, + #[cfg(any(bsd, target_os = "redox"))] + Err(nix::Error::EWOULDBLOCK) => return Err(SegmentOpenError::InvalidatedSegment), + Err(e) => return Err(SegmentOpenError::OsError(e as u32)), + } }; - // todo: flock() doesn't work on Mac in some cases, but we can fix it - #[cfg(target_os = "linux")] + // we use flock() on non-BSD systems + #[cfg(not(any(bsd, target_os = "redox")))] // put shared advisory lock on shm fd fd.as_raw_fd() .try_lock(FileLockMode::Shared) @@ -128,7 +143,6 @@ impl SegmentImpl { let data_ptr = Self::map(len, &fd).map_err(|e| SegmentOpenError::OsError(e as _))?; Ok(Self { - #[cfg(target_os = "linux")] fd, len, data_ptr, @@ -141,11 +155,22 @@ impl SegmentImpl { let fd = { let id = Self::id_str(id); let flags = OFlag::O_RDWR; + #[cfg(any(bsd, target_os = "redox"))] + let flags = flags | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; let mode = Mode::S_IRUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); shm_open(id.as_str(), flags, mode) }; + #[cfg(any(bsd, target_os = "redox"))] + // sussessful open means that we are the last owner of this file - unlink it + if fd.is_ok() { + let id = Self::id_str(id); + tracing::trace!("shm_unlink(name={})", id); + let _ = shm_unlink(id.as_str()); + } + + #[cfg(not(any(bsd, target_os = "redox")))] if let Ok(fd) = fd { Self::unlink_if_unique(id, &fd); } @@ -185,10 +210,9 @@ impl SegmentImpl { unsafe { mmap(None, len, prot, flags, fd, 0) } } - #[allow(unused_variables)] + // we use flock() on non-BSD systems + #[cfg(not(any(bsd, target_os = "redox")))] fn unlink_if_unique(id: ID, fd: &OwnedFd) { - // todo: flock() doesn't work on Mac in some cases, but we can fix it - #[cfg(target_os = "linux")] if fd.as_raw_fd().try_lock(FileLockMode::Exclusive).is_ok() { let id = Self::id_str(id); tracing::trace!("shm_unlink(name={})", id); @@ -204,19 +228,31 @@ impl Drop for SegmentImpl { tracing::debug!("munmap() failed : {}", _e); }; - #[cfg(target_os = "linux")] + #[cfg(not(any(bsd, target_os = "redox")))] Self::unlink_if_unique(self.id, &self.fd); - //{ - // let id = Self::id_str(self.id); - // let fd = open("myfile.txt", O_RDWR | O_CREAT | O_EXLOCK | O_NONBLOCK, 0666); - //} - - #[cfg(not(target_os = "linux"))] + #[cfg(any(bsd, target_os = "redox"))] { - let id = Self::id_str(self.id); - tracing::trace!("shm_unlink(name={})", id); - let _ = shm_unlink(id.as_str()); + // drop file descriptor to release O_SHLOCK we hold + let fd = unsafe { ManuallyDrop::take(&mut self.fd) }; + drop(fd); + + // generate shm id string + let id = Self::id_str(id); + + // try to open shm fd with O_EXLOCK + let fd = { + let flags = OFlag::O_RDWR | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; + let mode = Mode::S_IRUSR; + tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); + shm_open(id.as_str(), flags, mode) + }; + + // sussessful open means that we are the last owner of this file - unlink it + if fd.is_ok() { + tracing::trace!("shm_unlink(name={})", id); + let _ = shm_unlink(id.as_str()); + } } } } diff --git a/commons/zenoh-shm/tests/shm.rs b/commons/zenoh-shm/tests/shm.rs index f37327ba1..39c925f6a 100644 --- a/commons/zenoh-shm/tests/shm.rs +++ b/commons/zenoh-shm/tests/shm.rs @@ -53,8 +53,6 @@ fn create_and_open_amd_reopen() { assert!(opened_segment2.len() >= len); } -// todo: flock() doesn't work on Mac in some cases, but we can fix it -#[cfg(not(target_os = "macos"))] #[test] fn create_and_open_amd_reopen_and_open_closed() { let id = line!(); @@ -70,8 +68,6 @@ fn create_and_open_amd_reopen_and_open_closed() { assert!(opened_segment2.len() >= len); } -// todo: flock() doesn't work on Mac in some cases, but we can fix it -#[cfg(not(target_os = "macos"))] #[test] fn no_persistency() { let id = line!(); From d765bd5cc0656e00680b7b0f0172f5142032c011 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 28 Feb 2025 18:18:16 +0300 Subject: [PATCH 03/11] Add config aliases to properly detect BSD systems --- Cargo.lock | 1 + commons/zenoh-shm/Cargo.toml | 3 +++ commons/zenoh-shm/build.rs | 30 ++++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+) create mode 100644 commons/zenoh-shm/build.rs diff --git a/Cargo.lock b/Cargo.lock index a9ef4c009..fc640213e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5687,6 +5687,7 @@ version = "1.2.1" dependencies = [ "advisory-lock", "async-trait", + "cfg_aliases 0.2.1", "crossbeam-queue", "libc", "nix 0.29.0", diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 24e0ac066..cd49b1f40 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -57,3 +57,6 @@ winapi = { workspace = true } [dev-dependencies] libc = { workspace = true } + +[build-dependencies] +cfg_aliases = "0.2.1" \ No newline at end of file diff --git a/commons/zenoh-shm/build.rs b/commons/zenoh-shm/build.rs new file mode 100644 index 000000000..511881a09 --- /dev/null +++ b/commons/zenoh-shm/build.rs @@ -0,0 +1,30 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // these aliases should at least be included in the same aliases of Nix crate: + // ___________________ + // | | + // | Nix aliases | + // | ___________ | + // | | Our | | + // | | aliases | | + // | |_________| | + // |_________________| + cfg_aliases! { + dragonfly: { target_os = "dragonfly" }, + ios: { target_os = "ios" }, + freebsd: { target_os = "freebsd" }, + macos: { target_os = "macos" }, + netbsd: { target_os = "netbsd" }, + openbsd: { target_os = "openbsd" }, + watchos: { target_os = "watchos" }, + tvos: { target_os = "tvos" }, + visionos: { target_os = "visionos" }, + + apple_targets: { any(ios, macos, watchos, tvos, visionos) }, + bsd: { any(freebsd, dragonfly, netbsd, openbsd, apple_targets) }, + } + + println!("cargo:rustc-check-cfg=cfg(apple_targets)"); + println!("cargo:rustc-check-cfg=cfg(bsd)"); +} From a663b5af6f5a3e1ef12550f0e3eb1b72369be4d1 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 28 Feb 2025 18:25:46 +0300 Subject: [PATCH 04/11] - fix CI - add comments --- commons/zenoh-shm/src/shm/unix.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 69b7db6c4..1db9b2818 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -95,6 +95,10 @@ impl SegmentImpl { // map segment into our address space let data_ptr = Self::map(len, &fd).map_err(|e| SegmentCreateError::OsError(e as _))?; + // be careful!!! + #[cfg(any(bsd, target_os = "redox"))] + let fd = ManuallyDrop::new(fd); + Ok(Self { fd, len, @@ -108,11 +112,15 @@ impl SegmentImpl { let fd = { let id = Self::id_str(id); let flags = OFlag::O_RDWR; + + // open shm file with shared lock (BSD feature) #[cfg(any(bsd, target_os = "redox"))] let flags = flags | OFlag::O_SHLOCK | OFlag::O_NONBLOCK; + + // todo: these flags probably can be exposed to the config let mode = Mode::S_IRUSR; - tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); + tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); match shm_open(id.as_str(), flags, mode) { Ok(v) => v, #[cfg(any(bsd, target_os = "redox"))] @@ -142,6 +150,10 @@ impl SegmentImpl { // map segment into our address space let data_ptr = Self::map(len, &fd).map_err(|e| SegmentOpenError::OsError(e as _))?; + // be careful!!! + #[cfg(any(bsd, target_os = "redox"))] + let fd = ManuallyDrop::new(fd); + Ok(Self { fd, len, @@ -155,8 +167,11 @@ impl SegmentImpl { let fd = { let id = Self::id_str(id); let flags = OFlag::O_RDWR; + + // open shm file with exclusive lock (BSD feature) #[cfg(any(bsd, target_os = "redox"))] let flags = flags | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; + let mode = Mode::S_IRUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); shm_open(id.as_str(), flags, mode) @@ -238,7 +253,7 @@ impl Drop for SegmentImpl { drop(fd); // generate shm id string - let id = Self::id_str(id); + let id = Self::id_str(self.id); // try to open shm fd with O_EXLOCK let fd = { From 721319df89a28c634e20588e59c43e734dd4c24c Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 28 Feb 2025 18:53:04 +0300 Subject: [PATCH 05/11] Update unix.rs --- commons/zenoh-shm/src/shm/unix.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 1db9b2818..808776611 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -118,7 +118,7 @@ impl SegmentImpl { let flags = flags | OFlag::O_SHLOCK | OFlag::O_NONBLOCK; // todo: these flags probably can be exposed to the config - let mode = Mode::S_IRUSR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); match shm_open(id.as_str(), flags, mode) { @@ -172,7 +172,7 @@ impl SegmentImpl { #[cfg(any(bsd, target_os = "redox"))] let flags = flags | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; - let mode = Mode::S_IRUSR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); shm_open(id.as_str(), flags, mode) }; @@ -239,8 +239,8 @@ impl SegmentImpl { impl Drop for SegmentImpl { fn drop(&mut self) { tracing::trace!("munmap(addr={:p},len={})", self.data_ptr, self.len); - if let Err(_e) = unsafe { munmap(self.data_ptr, self.len.get()) } { - tracing::debug!("munmap() failed : {}", _e); + if let Err(e) = unsafe { munmap(self.data_ptr, self.len.get()) } { + tracing::debug!("munmap() failed : {}", e); }; #[cfg(not(any(bsd, target_os = "redox")))] @@ -258,7 +258,7 @@ impl Drop for SegmentImpl { // try to open shm fd with O_EXLOCK let fd = { let flags = OFlag::O_RDWR | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; - let mode = Mode::S_IRUSR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); shm_open(id.as_str(), flags, mode) }; From 73089b21e8afd54eb2947c121154604e7e2788cb Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 5 Mar 2025 11:44:48 +0300 Subject: [PATCH 06/11] Apply advisory locking on non-tmpfs for BSD --- commons/zenoh-shm/src/posix_shm/cleanup.rs | 3 + commons/zenoh-shm/src/shm/mod.rs | 2 +- commons/zenoh-shm/src/shm/unix.rs | 143 +++++++-------------- 3 files changed, 54 insertions(+), 94 deletions(-) diff --git a/commons/zenoh-shm/src/posix_shm/cleanup.rs b/commons/zenoh-shm/src/posix_shm/cleanup.rs index 4044251d3..c4c5cf9af 100644 --- a/commons/zenoh-shm/src/posix_shm/cleanup.rs +++ b/commons/zenoh-shm/src/posix_shm/cleanup.rs @@ -34,6 +34,9 @@ mod platform { } fn cleanup_orphaned_segments_inner() -> ZResult<()> { + #[cfg(any(bsd, target_os = "redox"))] + let shm_files = fs::read_dir(std::env::temp_dir())?; + #[cfg(not(any(bsd, target_os = "redox")))] let shm_files = fs::read_dir("/dev/shm")?; for segment_file in shm_files.filter_map(Result::ok).filter(|f| { diff --git a/commons/zenoh-shm/src/shm/mod.rs b/commons/zenoh-shm/src/shm/mod.rs index 09e37601c..e99f4886b 100644 --- a/commons/zenoh-shm/src/shm/mod.rs +++ b/commons/zenoh-shm/src/shm/mod.rs @@ -64,7 +64,7 @@ impl Segment { } pub fn ensure_not_persistent(id: ID) { - platform::SegmentImpl::ensure_not_persistent(id); + let _ = platform::SegmentImpl::open(id); } } diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 808776611..3d3a3236a 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -24,6 +24,8 @@ use std::{ // we use flock() on non-BSD systems #[cfg(not(any(bsd, target_os = "redox")))] use advisory_lock::{AdvisoryFileLock, FileLockMode}; +#[cfg(any(bsd, target_os = "redox"))] +use nix::fcntl::open; use nix::{ fcntl::OFlag, sys::{ @@ -36,10 +38,7 @@ use nix::{ use super::{SegmentCreateError, SegmentID, SegmentOpenError, ShmCreateResult, ShmOpenResult}; pub struct SegmentImpl { - #[cfg(not(any(bsd, target_os = "redox")))] - fd: OwnedFd, - #[cfg(any(bsd, target_os = "redox"))] - fd: ManuallyDrop, + lock_fd: OwnedFd, len: NonZeroUsize, data_ptr: NonNull, id: ID, @@ -48,32 +47,40 @@ pub struct SegmentImpl { // PUBLIC impl SegmentImpl { pub fn create(id: ID, len: NonZeroUsize) -> ShmCreateResult { + // we use separate lockfile on non-tmpfs for bsd + #[cfg(any(bsd, target_os = "redox"))] + let lock_fd = { + let lockpath = std::env::temp_dir().join(Self::id_str(id)); + let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + open(&lockpath, flags, mode).map_err(|_| SegmentCreateError::SegmentExists) + }?; + // create unique shm fd let fd = { let id = Self::id_str(id); let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; - // open shm file with shared lock (BSD feature) - #[cfg(any(bsd, target_os = "redox"))] - let flags = flags | OFlag::O_SHLOCK | OFlag::O_NONBLOCK; - // todo: these flags probably can be exposed to the config let mode = Mode::S_IRUSR | Mode::S_IWUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); match shm_open(id.as_str(), flags, mode) { Ok(v) => v, - #[cfg(any(bsd, target_os = "redox"))] - Err(nix::Error::EWOULDBLOCK) => return Err(SegmentCreateError::SegmentExists), Err(nix::Error::EEXIST) => return Err(SegmentCreateError::SegmentExists), Err(e) => return Err(SegmentCreateError::OsError(e as u32)), } }; - // we use flock() on non-BSD systems + // on non-bsd we use our SHM file also for locking #[cfg(not(any(bsd, target_os = "redox")))] - // put shared advisory lock on shm fd - fd.as_raw_fd() + let lock_fd = fd; + #[cfg(not(any(bsd, target_os = "redox")))] + let fd = &lock_fd; + + // put shared advisory lock on lock_fd + lock_fd + .as_raw_fd() .try_lock(FileLockMode::Shared) .map_err(|e| match e { advisory_lock::FileLockError::AlreadyLocked => SegmentCreateError::SegmentExists, @@ -95,12 +102,8 @@ impl SegmentImpl { // map segment into our address space let data_ptr = Self::map(len, &fd).map_err(|e| SegmentCreateError::OsError(e as _))?; - // be careful!!! - #[cfg(any(bsd, target_os = "redox"))] - let fd = ManuallyDrop::new(fd); - Ok(Self { - fd, + lock_fd, len, data_ptr, id, @@ -108,31 +111,39 @@ impl SegmentImpl { } pub fn open(id: ID) -> ShmOpenResult { + // we use separate lockfile on non-tmpfs for bsd + #[cfg(any(bsd, target_os = "redox"))] + let lock_fd = { + let lockpath = std::env::temp_dir().join(Self::id_str(id)); + let flags = OFlag::O_RDWR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + open(&lockpath, flags, mode).map_err(|_| SegmentOpenError::InvalidatedSegment) + }?; + // open shm fd let fd = { let id = Self::id_str(id); let flags = OFlag::O_RDWR; - // open shm file with shared lock (BSD feature) - #[cfg(any(bsd, target_os = "redox"))] - let flags = flags | OFlag::O_SHLOCK | OFlag::O_NONBLOCK; - // todo: these flags probably can be exposed to the config let mode = Mode::S_IRUSR | Mode::S_IWUSR; tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); match shm_open(id.as_str(), flags, mode) { Ok(v) => v, - #[cfg(any(bsd, target_os = "redox"))] - Err(nix::Error::EWOULDBLOCK) => return Err(SegmentOpenError::InvalidatedSegment), Err(e) => return Err(SegmentOpenError::OsError(e as u32)), } }; - // we use flock() on non-BSD systems + // on non-bsd we use our SHM file also for locking + #[cfg(not(any(bsd, target_os = "redox")))] + let lock_fd = fd; #[cfg(not(any(bsd, target_os = "redox")))] - // put shared advisory lock on shm fd - fd.as_raw_fd() + let fd = &lock_fd; + + // put shared advisory lock on lock_fd + lock_fd + .as_raw_fd() .try_lock(FileLockMode::Shared) .map_err(|e| match e { advisory_lock::FileLockError::AlreadyLocked => SegmentOpenError::InvalidatedSegment, @@ -150,47 +161,14 @@ impl SegmentImpl { // map segment into our address space let data_ptr = Self::map(len, &fd).map_err(|e| SegmentOpenError::OsError(e as _))?; - // be careful!!! - #[cfg(any(bsd, target_os = "redox"))] - let fd = ManuallyDrop::new(fd); - Ok(Self { - fd, + lock_fd, len, data_ptr, id, }) } - pub fn ensure_not_persistent(id: ID) { - // open shm fd - let fd = { - let id = Self::id_str(id); - let flags = OFlag::O_RDWR; - - // open shm file with exclusive lock (BSD feature) - #[cfg(any(bsd, target_os = "redox"))] - let flags = flags | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; - - let mode = Mode::S_IRUSR | Mode::S_IWUSR; - tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); - shm_open(id.as_str(), flags, mode) - }; - - #[cfg(any(bsd, target_os = "redox"))] - // sussessful open means that we are the last owner of this file - unlink it - if fd.is_ok() { - let id = Self::id_str(id); - tracing::trace!("shm_unlink(name={})", id); - let _ = shm_unlink(id.as_str()); - } - - #[cfg(not(any(bsd, target_os = "redox")))] - if let Ok(fd) = fd { - Self::unlink_if_unique(id, &fd); - } - } - pub fn id(&self) -> ID { self.id } @@ -224,16 +202,6 @@ impl SegmentImpl { unsafe { mmap(None, len, prot, flags, fd, 0) } } - - // we use flock() on non-BSD systems - #[cfg(not(any(bsd, target_os = "redox")))] - fn unlink_if_unique(id: ID, fd: &OwnedFd) { - if fd.as_raw_fd().try_lock(FileLockMode::Exclusive).is_ok() { - let id = Self::id_str(id); - tracing::trace!("shm_unlink(name={})", id); - let _ = shm_unlink(id.as_str()); - } - } } impl Drop for SegmentImpl { @@ -243,30 +211,19 @@ impl Drop for SegmentImpl { tracing::debug!("munmap() failed : {}", e); }; - #[cfg(not(any(bsd, target_os = "redox")))] - Self::unlink_if_unique(self.id, &self.fd); - - #[cfg(any(bsd, target_os = "redox"))] + if self + .lock_fd + .as_raw_fd() + .try_lock(FileLockMode::Exclusive) + .is_ok() { - // drop file descriptor to release O_SHLOCK we hold - let fd = unsafe { ManuallyDrop::take(&mut self.fd) }; - drop(fd); - - // generate shm id string let id = Self::id_str(self.id); - - // try to open shm fd with O_EXLOCK - let fd = { - let flags = OFlag::O_RDWR | OFlag::O_EXLOCK | OFlag::O_NONBLOCK; - let mode = Mode::S_IRUSR | Mode::S_IWUSR; - tracing::trace!("shm_open(name={}, flag={:?}, mode={:?})", id, flags, mode); - shm_open(id.as_str(), flags, mode) - }; - - // sussessful open means that we are the last owner of this file - unlink it - if fd.is_ok() { - tracing::trace!("shm_unlink(name={})", id); - let _ = shm_unlink(id.as_str()); + tracing::trace!("shm_unlink(name={})", id); + let _ = shm_unlink(id.as_str()); + #[cfg(any(bsd, target_os = "redox"))] + { + let lockpath = std::env::temp_dir().join(id); + let _ = std::fs::remove_file(lockpath); } } } From 9fa82214bf821b6eebb6e5af975a22d907723cbe Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 5 Mar 2025 12:00:30 +0300 Subject: [PATCH 07/11] fix issues on mac --- commons/zenoh-shm/src/shm/unix.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 3d3a3236a..31c6b7d21 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -12,8 +12,6 @@ // ZettaScale Zenoh Team, // -#[cfg(any(bsd, target_os = "redox"))] -use std::mem::ManuallyDrop; use std::{ ffi::c_void, num::NonZeroUsize, @@ -21,8 +19,6 @@ use std::{ ptr::NonNull, }; -// we use flock() on non-BSD systems -#[cfg(not(any(bsd, target_os = "redox")))] use advisory_lock::{AdvisoryFileLock, FileLockMode}; #[cfg(any(bsd, target_os = "redox"))] use nix::fcntl::open; @@ -78,6 +74,9 @@ impl SegmentImpl { #[cfg(not(any(bsd, target_os = "redox")))] let fd = &lock_fd; + #[cfg(any(bsd, target_os = "redox"))] + let fd = &fd; + // put shared advisory lock on lock_fd lock_fd .as_raw_fd() @@ -91,7 +90,7 @@ impl SegmentImpl { // resize shm segment to requested size tracing::trace!("ftruncate(fd={}, len={})", fd.as_raw_fd(), len); - ftruncate(&fd, len.get() as _).map_err(|e| SegmentCreateError::OsError(e as u32))?; + ftruncate(fd, len.get() as _).map_err(|e| SegmentCreateError::OsError(e as u32))?; // get real segment size let len = { @@ -100,7 +99,7 @@ impl SegmentImpl { }; // map segment into our address space - let data_ptr = Self::map(len, &fd).map_err(|e| SegmentCreateError::OsError(e as _))?; + let data_ptr = Self::map(len, fd).map_err(|e| SegmentCreateError::OsError(e as _))?; Ok(Self { lock_fd, @@ -141,6 +140,9 @@ impl SegmentImpl { #[cfg(not(any(bsd, target_os = "redox")))] let fd = &lock_fd; + #[cfg(any(bsd, target_os = "redox"))] + let fd = &fd; + // put shared advisory lock on lock_fd lock_fd .as_raw_fd() @@ -159,7 +161,7 @@ impl SegmentImpl { }; // map segment into our address space - let data_ptr = Self::map(len, &fd).map_err(|e| SegmentOpenError::OsError(e as _))?; + let data_ptr = Self::map(len, fd).map_err(|e| SegmentOpenError::OsError(e as _))?; Ok(Self { lock_fd, From 278687a7c55dd4fdeb0945dbf2bd9db2cb91639a Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 5 Mar 2025 12:52:51 +0300 Subject: [PATCH 08/11] Update unix.rs --- commons/zenoh-shm/src/shm/unix.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 31c6b7d21..d04261410 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -45,12 +45,15 @@ impl SegmentImpl { pub fn create(id: ID, len: NonZeroUsize) -> ShmCreateResult { // we use separate lockfile on non-tmpfs for bsd #[cfg(any(bsd, target_os = "redox"))] - let lock_fd = { - let lockpath = std::env::temp_dir().join(Self::id_str(id)); - let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; - let mode = Mode::S_IRUSR | Mode::S_IWUSR; - open(&lockpath, flags, mode).map_err(|_| SegmentCreateError::SegmentExists) - }?; + let lock_fd: OwnedFd = unsafe { + { + let lockpath = std::env::temp_dir().join(Self::id_str(id)); + let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + open(&lockpath, flags, mode).map_err(|_| SegmentCreateError::SegmentExists) + }? + .into() + }; // create unique shm fd let fd = { @@ -112,12 +115,14 @@ impl SegmentImpl { pub fn open(id: ID) -> ShmOpenResult { // we use separate lockfile on non-tmpfs for bsd #[cfg(any(bsd, target_os = "redox"))] - let lock_fd = { - let lockpath = std::env::temp_dir().join(Self::id_str(id)); - let flags = OFlag::O_RDWR; - let mode = Mode::S_IRUSR | Mode::S_IWUSR; - open(&lockpath, flags, mode).map_err(|_| SegmentOpenError::InvalidatedSegment) - }?; + let lock_fd: OwnedFd = unsafe { + { + let lockpath = std::env::temp_dir().join(Self::id_str(id)); + let flags = OFlag::O_RDWR; + let mode = Mode::S_IRUSR | Mode::S_IWUSR; + open(&lockpath, flags, mode).map_err(|_| SegmentOpenError::InvalidatedSegment) + }? + }; // open shm fd let fd = { From 355170c1681c361f7f97b3ec154cd1dd74bee85b Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 5 Mar 2025 13:09:37 +0300 Subject: [PATCH 09/11] Update unix.rs --- commons/zenoh-shm/src/shm/unix.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index d04261410..8e5081564 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -12,6 +12,8 @@ // ZettaScale Zenoh Team, // +#[cfg(any(bsd, target_os = "redox"))] +use std::os::fd::FromRawFd; use std::{ ffi::c_void, num::NonZeroUsize, @@ -45,14 +47,13 @@ impl SegmentImpl { pub fn create(id: ID, len: NonZeroUsize) -> ShmCreateResult { // we use separate lockfile on non-tmpfs for bsd #[cfg(any(bsd, target_os = "redox"))] - let lock_fd: OwnedFd = unsafe { - { + let lock_fd = unsafe { + OwnedFd::from_raw_fd({ let lockpath = std::env::temp_dir().join(Self::id_str(id)); let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; let mode = Mode::S_IRUSR | Mode::S_IWUSR; open(&lockpath, flags, mode).map_err(|_| SegmentCreateError::SegmentExists) - }? - .into() + }?) }; // create unique shm fd @@ -115,13 +116,13 @@ impl SegmentImpl { pub fn open(id: ID) -> ShmOpenResult { // we use separate lockfile on non-tmpfs for bsd #[cfg(any(bsd, target_os = "redox"))] - let lock_fd: OwnedFd = unsafe { - { + let lock_fd = unsafe { + OwnedFd::from_raw_fd({ let lockpath = std::env::temp_dir().join(Self::id_str(id)); let flags = OFlag::O_RDWR; let mode = Mode::S_IRUSR | Mode::S_IWUSR; open(&lockpath, flags, mode).map_err(|_| SegmentOpenError::InvalidatedSegment) - }? + }?) }; // open shm fd From fabfa0fb973a51612dd0829cb5175c799326869b Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 5 Mar 2025 13:43:34 +0300 Subject: [PATCH 10/11] Update windows.rs --- commons/zenoh-shm/src/shm/windows.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/commons/zenoh-shm/src/shm/windows.rs b/commons/zenoh-shm/src/shm/windows.rs index ec5ae68b9..55134ff52 100644 --- a/commons/zenoh-shm/src/shm/windows.rs +++ b/commons/zenoh-shm/src/shm/windows.rs @@ -109,8 +109,6 @@ impl SegmentImpl { }) } - pub fn ensure_not_persistent(_id: ID) {} - pub fn id(&self) -> ID { self.id } From 68febcf16768f616a7b0fd7527fa7105337bbf4d Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 5 Mar 2025 15:28:12 +0300 Subject: [PATCH 11/11] Update unix.rs and windows.rs --- commons/zenoh-shm/src/shm/unix.rs | 9 ++++++--- commons/zenoh-shm/src/shm/windows.rs | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/commons/zenoh-shm/src/shm/unix.rs b/commons/zenoh-shm/src/shm/unix.rs index 8e5081564..3fa4aea32 100644 --- a/commons/zenoh-shm/src/shm/unix.rs +++ b/commons/zenoh-shm/src/shm/unix.rs @@ -52,7 +52,10 @@ impl SegmentImpl { let lockpath = std::env::temp_dir().join(Self::id_str(id)); let flags = OFlag::O_CREAT | OFlag::O_EXCL | OFlag::O_RDWR; let mode = Mode::S_IRUSR | Mode::S_IWUSR; - open(&lockpath, flags, mode).map_err(|_| SegmentCreateError::SegmentExists) + open(&lockpath, flags, mode).map_err(|e| match e { + nix::Error::EEXIST => SegmentCreateError::SegmentExists, + e => SegmentCreateError::OsError(e as u32), + }) }?) }; @@ -121,7 +124,7 @@ impl SegmentImpl { let lockpath = std::env::temp_dir().join(Self::id_str(id)); let flags = OFlag::O_RDWR; let mode = Mode::S_IRUSR | Mode::S_IWUSR; - open(&lockpath, flags, mode).map_err(|_| SegmentOpenError::InvalidatedSegment) + open(&lockpath, flags, mode).map_err(|e| SegmentOpenError::OsError(e as _)) }?) }; @@ -193,7 +196,7 @@ impl SegmentImpl { // PRIVATE impl SegmentImpl { fn id_str(id: ID) -> String { - format!("/{}.zenoh", id) + format!("{id}.zenoh") } fn map(len: NonZeroUsize, fd: &OwnedFd) -> nix::Result> { diff --git a/commons/zenoh-shm/src/shm/windows.rs b/commons/zenoh-shm/src/shm/windows.rs index 55134ff52..0b4d3f5ba 100644 --- a/commons/zenoh-shm/src/shm/windows.rs +++ b/commons/zenoh-shm/src/shm/windows.rs @@ -125,7 +125,7 @@ impl SegmentImpl { // PRIVATE impl SegmentImpl { fn id_str(id: ID) -> String { - format!("/{}.zenoh", id) + format!("{}.zenoh", id) } fn map(fd: &FileMapping) -> Result<(ViewOfFile, usize), Error> {