From 6e71efb44dddce41f575874bca6d5d4cd5f2f492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 30 Nov 2024 13:14:39 +0100 Subject: [PATCH] Reintroduce the prepare_for_closing function and Env/EncryptedEnv: Clone --- examples/heed3-encrypted.rs | 8 ++--- heed/Cargo.toml | 1 + heed/src/envs/encrypted_env.rs | 14 ++++++-- heed/src/envs/env.rs | 53 +++++++++++++++++++++++-------- heed/src/envs/env_open_options.rs | 6 ++-- heed/src/envs/mod.rs | 36 +++++++++++++++++++-- heed/src/lib.rs | 4 +-- heed/src/txn.rs | 35 ++++++++++---------- heed3/Cargo.toml | 1 + 9 files changed, 114 insertions(+), 44 deletions(-) diff --git a/examples/heed3-encrypted.rs b/examples/heed3-encrypted.rs index acb3c671..7f7b9332 100644 --- a/examples/heed3-encrypted.rs +++ b/examples/heed3-encrypted.rs @@ -5,7 +5,7 @@ use std::path::Path; use argon2::Argon2; use chacha20poly1305::{ChaCha20Poly1305, Key}; use heed3::types::*; -use heed3::{Database, EnvOpenOptions}; +use heed3::EnvOpenOptions; fn main() -> Result<(), Box> { let env_path = Path::new("target").join("encrypt.mdb"); @@ -40,14 +40,14 @@ fn main() -> Result<(), Box> { db.put(&mut wtxn, key1, val1)?; db.put(&mut wtxn, key2, val2)?; wtxn.commit()?; - // env.prepare_for_closing().wait(); + env.prepare_for_closing().wait(); // We reopen the environment now - let env = unsafe { options.open(&env_path)? }; + let env = unsafe { options.open_encrypted::(key, &env_path)? }; // We check that the secret entries are correctly decrypted let mut rtxn = env.read_txn()?; - let db: Database = env.open_database(&rtxn, Some("first"))?.unwrap(); + let db = env.open_database::(&rtxn, Some("first"))?.unwrap(); let mut iter = db.iter(&mut rtxn)?; assert_eq!(iter.next().transpose()?, Some((key1, val1))); assert_eq!(iter.next().transpose()?, Some((key2, val2))); diff --git a/heed/Cargo.toml b/heed/Cargo.toml index 35374010..fb8eae96 100644 --- a/heed/Cargo.toml +++ b/heed/Cargo.toml @@ -20,6 +20,7 @@ lmdb-master-sys = { version = "0.2.4", path = "../lmdb-master-sys" } once_cell = "1.19.0" page_size = "0.6.0" serde = { version = "1.0.203", features = ["derive"], optional = true } +synchronoise = "1.0.1" [dev-dependencies] serde = { version = "1.0.203", features = ["derive"] } diff --git a/heed/src/envs/encrypted_env.rs b/heed/src/envs/encrypted_env.rs index b37d5b2c..f78a8fad 100644 --- a/heed/src/envs/encrypted_env.rs +++ b/heed/src/envs/encrypted_env.rs @@ -6,12 +6,13 @@ use std::path::Path; use aead::generic_array::typenum::Unsigned; use aead::{AeadMutInPlace, Key, KeyInit, Nonce, Tag}; -use super::{Env, EnvInfo, FlagSetMode}; +use super::{Env, EnvClosingEvent, EnvInfo, FlagSetMode}; use crate::databases::{EncryptedDatabase, EncryptedDatabaseOpenOptions}; use crate::mdb::ffi::{self}; use crate::{CompactionOption, EnvFlags, Result, RoTxn, RwTxn, Unspecified}; /// An environment handle constructed by using [`EnvOpenOptions::open_encrypted`]. +#[derive(Clone)] pub struct EncryptedEnv { pub(crate) inner: Env, } @@ -278,6 +279,15 @@ impl EncryptedEnv { self.inner.path() } + /// Returns an `EnvClosingEvent` that can be used to wait for the closing event, + /// multiple threads can wait on this event. + /// + /// Make sure that you drop all the copies of `Env`s you have, env closing are triggered + /// when all references are dropped, the last one will eventually close the environment. + pub fn prepare_for_closing(self) -> EnvClosingEvent { + self.inner.prepare_for_closing() + } + /// Check for stale entries in the reader lock table and clear them. /// /// Returns the number of stale readers cleared. @@ -311,7 +321,7 @@ unsafe impl Sync for EncryptedEnv {} impl fmt::Debug for EncryptedEnv { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("EncryptedEnv") - .field("path", &self.inner.path.display()) + .field("path", &self.inner.path().display()) .finish_non_exhaustive() } } diff --git a/heed/src/envs/env.rs b/heed/src/envs/env.rs index 3d8760d8..c7e60b41 100644 --- a/heed/src/envs/env.rs +++ b/heed/src/envs/env.rs @@ -3,13 +3,15 @@ use std::ffi::CString; use std::fs::File; use std::path::{Path, PathBuf}; use std::ptr::{self, NonNull}; +use std::sync::Arc; use std::{fmt, io, mem}; use heed_traits::Comparator; +use synchronoise::SignalEvent; use super::{ - custom_key_cmp_wrapper, get_file_fd, metadata_from_fd, DefaultComparator, EnvInfo, FlagSetMode, - IntegerComparator, OPENED_ENV, + custom_key_cmp_wrapper, get_file_fd, metadata_from_fd, DefaultComparator, EnvClosingEvent, + EnvInfo, FlagSetMode, IntegerComparator, OPENED_ENV, }; use crate::cursor::{MoveOperation, RoCursor}; use crate::mdb::ffi::{self, MDB_env}; @@ -21,18 +23,24 @@ use crate::{ }; /// An environment handle constructed by using [`EnvOpenOptions::open`]. +#[derive(Clone)] pub struct Env { - env_ptr: NonNull, - pub(crate) path: PathBuf, + inner: Arc, } impl Env { pub(crate) fn new(env_ptr: NonNull, path: PathBuf) -> Env { - Env { env_ptr, path } + Env { + inner: Arc::new(EnvInner { + env_ptr, + path, + signal_event: Arc::new(SignalEvent::manual(false)), + }), + } } pub(crate) fn env_mut_ptr(&self) -> NonNull { - self.env_ptr + self.inner.env_ptr } /// The size of the data file on disk. @@ -125,7 +133,7 @@ impl Env { /// Returns some basic informations about this environment. pub fn info(&self) -> EnvInfo { let mut raw_info = mem::MaybeUninit::uninit(); - unsafe { ffi::mdb_env_info(self.env_ptr.as_ptr(), raw_info.as_mut_ptr()) }; + unsafe { ffi::mdb_env_info(self.inner.env_ptr.as_ptr(), raw_info.as_mut_ptr()) }; let raw_info = unsafe { raw_info.assume_init() }; EnvInfo { @@ -410,19 +418,28 @@ impl Env { option: CompactionOption, ) -> Result<()> { let flags = if let CompactionOption::Enabled = option { ffi::MDB_CP_COMPACT } else { 0 }; - mdb_result(ffi::mdb_env_copyfd2(self.env_ptr.as_ptr(), fd, flags))?; + mdb_result(ffi::mdb_env_copyfd2(self.inner.env_ptr.as_ptr(), fd, flags))?; Ok(()) } /// Flush the data buffers to disk. pub fn force_sync(&self) -> Result<()> { - unsafe { mdb_result(ffi::mdb_env_sync(self.env_ptr.as_ptr(), 1))? } + unsafe { mdb_result(ffi::mdb_env_sync(self.inner.env_ptr.as_ptr(), 1))? } Ok(()) } /// Returns the canonicalized path where this env lives. pub fn path(&self) -> &Path { - &self.path + &self.inner.path + } + + /// Returns an `EnvClosingEvent` that can be used to wait for the closing event, + /// multiple threads can wait on this event. + /// + /// Make sure that you drop all the copies of `Env`s you have, env closing are triggered + /// when all references are dropped, the last one will eventually close the environment. + pub fn prepare_for_closing(self) -> EnvClosingEvent { + EnvClosingEvent(self.inner.signal_event.clone()) } /// Check for stale entries in the reader lock table and clear them. @@ -430,7 +447,7 @@ impl Env { /// Returns the number of stale readers cleared. pub fn clear_stale_readers(&self) -> Result { let mut dead: i32 = 0; - unsafe { mdb_result(ffi::mdb_reader_check(self.env_ptr.as_ptr(), &mut dead))? } + unsafe { mdb_result(ffi::mdb_reader_check(self.inner.env_ptr.as_ptr(), &mut dead))? } // safety: The reader_check function asks for an i32, initialize it to zero // and never decrements it. It is safe to use either an u32 or u64 (usize). Ok(dead as usize) @@ -471,14 +488,22 @@ unsafe impl Sync for Env {} impl fmt::Debug for Env { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Env").field("path", &self.path.display()).finish_non_exhaustive() + f.debug_struct("Env").field("path", &self.inner.path.display()).finish_non_exhaustive() } } -impl Drop for Env { +#[derive(Clone)] +pub(crate) struct EnvInner { + env_ptr: NonNull, + signal_event: Arc, + pub(crate) path: PathBuf, +} + +impl Drop for EnvInner { fn drop(&mut self) { unsafe { ffi::mdb_env_close(self.env_ptr.as_mut()) }; let mut lock = OPENED_ENV.write().unwrap(); - debug_assert!(lock.remove(&self.path)); + let removed = lock.remove(&self.path); + debug_assert!(removed); } } diff --git a/heed/src/envs/env_open_options.rs b/heed/src/envs/env_open_options.rs index 72418c2e..859a91a2 100644 --- a/heed/src/envs/env_open_options.rs +++ b/heed/src/envs/env_open_options.rs @@ -213,7 +213,8 @@ impl EnvOpenOptions { match mdb_result(result) { Ok(()) => { let env_ptr = NonNull::new(env).unwrap(); - debug_assert!(lock.insert(path.clone())); + let inserted = lock.insert(path.clone()); + debug_assert!(inserted); Ok(Env::new(env_ptr, path)) } Err(e) => { @@ -404,7 +405,8 @@ impl EnvOpenOptions { match mdb_result(result) { Ok(()) => { let env_ptr = NonNull::new(env).unwrap(); - debug_assert!(lock.insert(path.clone())); + let inserted = lock.insert(path.clone()); + debug_assert!(inserted); Ok(EncryptedEnv { inner: Env::new(env_ptr, path) }) } Err(e) => { diff --git a/heed/src/envs/mod.rs b/heed/src/envs/mod.rs index 9eb31dd3..3454099a 100644 --- a/heed/src/envs/mod.rs +++ b/heed/src/envs/mod.rs @@ -2,20 +2,22 @@ use std::cmp::Ordering; use std::collections::HashSet; use std::ffi::c_void; use std::fs::{File, Metadata}; -use std::io; #[cfg(unix)] use std::os::unix::io::{AsRawFd, BorrowedFd, RawFd}; use std::panic::catch_unwind; use std::path::{Path, PathBuf}; use std::process::abort; -use std::sync::{LazyLock, RwLock}; +use std::sync::{Arc, LazyLock, RwLock}; +use std::time::Duration; #[cfg(windows)] use std::{ ffi::OsStr, os::windows::io::{AsRawHandle as _, BorrowedHandle, RawHandle}, }; +use std::{fmt, io}; use heed_traits::{Comparator, LexicographicComparator}; +use synchronoise::event::SignalEvent; use crate::mdb::ffi; @@ -50,6 +52,36 @@ pub struct EnvInfo { pub number_of_readers: u32, } +/// A structure that can be used to wait for the closing event. +/// Multiple threads can wait on this event. +#[derive(Clone)] +pub struct EnvClosingEvent(Arc); + +impl EnvClosingEvent { + /// Blocks this thread until the environment is effectively closed. + /// + /// # Safety + /// + /// Make sure that you don't have any copy of the environment in the thread + /// that is waiting for a close event. If you do, you will have a deadlock. + pub fn wait(&self) { + self.0.wait() + } + + /// Blocks this thread until either the environment has been closed + /// or until the timeout elapses. Returns `true` if the environment + /// has been effectively closed. + pub fn wait_timeout(&self, timeout: Duration) -> bool { + self.0.wait_timeout(timeout) + } +} + +impl fmt::Debug for EnvClosingEvent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EnvClosingEvent").finish() + } +} + // Thanks to the mozilla/rkv project // Workaround the UNC path on Windows, see https://github.com/rust-lang/rust/issues/42869. // Otherwise, `Env::from_env()` will panic with error_no(123). diff --git a/heed/src/lib.rs b/heed/src/lib.rs index bcbc19d7..0ce180e7 100644 --- a/heed/src/lib.rs +++ b/heed/src/lib.rs @@ -85,8 +85,8 @@ pub use self::databases::{EncryptedDatabase, EncryptedDatabaseOpenOptions}; #[cfg(master3)] pub use self::envs::EncryptedEnv; pub use self::envs::{ - CompactionOption, DefaultComparator, Env, EnvInfo, EnvOpenOptions, FlagSetMode, - IntegerComparator, + CompactionOption, DefaultComparator, Env, EnvClosingEvent, EnvInfo, EnvOpenOptions, + FlagSetMode, IntegerComparator, }; pub use self::iterator::{ RoIter, RoPrefix, RoRange, RoRevIter, RoRevPrefix, RoRevRange, RwIter, RwPrefix, RwRange, diff --git a/heed/src/txn.rs b/heed/src/txn.rs index 07ba369e..657f1262 100644 --- a/heed/src/txn.rs +++ b/heed/src/txn.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::ops::Deref; use std::ptr::{self, NonNull}; @@ -47,7 +48,7 @@ use crate::Result; pub struct RoTxn<'e> { /// Makes the struct covariant and !Sync pub(crate) txn: Option>, - env: &'e Env, + env: Cow<'e, Env>, } impl<'e> RoTxn<'e> { @@ -63,24 +64,22 @@ impl<'e> RoTxn<'e> { ))? }; - Ok(RoTxn { txn: NonNull::new(txn), env }) + Ok(RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) }) } - // TODO replace this by an ArcRoTxn pub(crate) fn static_read_txn(env: Env) -> Result> { - // let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); - - // unsafe { - // mdb_result(ffi::mdb_txn_begin( - // env.env_mut_ptr(), - // ptr::null_mut(), - // ffi::MDB_RDONLY, - // &mut txn, - // ))? - // }; - - // Ok(RoTxn { txn, env: Cow::Owned(env) }) - todo!() + let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); + + unsafe { + mdb_result(ffi::mdb_txn_begin( + env.env_mut_ptr().as_mut(), + ptr::null_mut(), + ffi::MDB_RDONLY, + &mut txn, + ))? + }; + + Ok(RoTxn { txn: NonNull::new(txn), env: Cow::Owned(env) }) } pub(crate) fn env_mut_ptr(&self) -> NonNull { @@ -172,7 +171,7 @@ impl<'p> RwTxn<'p> { ))? }; - Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env } }) + Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) } }) } pub(crate) fn nested(env: &'p Env, parent: &'p mut RwTxn) -> Result> { @@ -183,7 +182,7 @@ impl<'p> RwTxn<'p> { mdb_result(ffi::mdb_txn_begin(env.env_mut_ptr().as_mut(), parent_ptr, 0, &mut txn))? }; - Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env } }) + Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) } }) } pub(crate) fn env_mut_ptr(&self) -> NonNull { diff --git a/heed3/Cargo.toml b/heed3/Cargo.toml index 42a6a7b2..3ebac3c3 100644 --- a/heed3/Cargo.toml +++ b/heed3/Cargo.toml @@ -23,6 +23,7 @@ lmdb-master3-sys = { version = "0.2.4", path = "../lmdb-master3-sys" } once_cell = "1.19.0" page_size = "0.6.0" serde = { version = "1.0.203", features = ["derive"], optional = true } +synchronoise = "1.0.1" [dev-dependencies] # TODO update dependencies