Skip to content

Commit 39c77f1

Browse files
authored
Merge pull request #285 from meilisearch/covariant-non-sync-rotxn
2 parents 3a54bfd + 15b6eac commit 39c77f1

File tree

4 files changed

+105
-42
lines changed

4 files changed

+105
-42
lines changed

heed/src/cursor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ pub struct RoCursor<'txn> {
1313
impl<'txn> RoCursor<'txn> {
1414
pub(crate) fn new(txn: &'txn RoTxn, dbi: ffi::MDB_dbi) -> Result<RoCursor<'txn>> {
1515
let mut cursor: *mut ffi::MDB_cursor = ptr::null_mut();
16-
unsafe { mdb_result(ffi::mdb_cursor_open(txn.txn, dbi, &mut cursor))? }
16+
let mut txn = txn.txn.unwrap();
17+
unsafe { mdb_result(ffi::mdb_cursor_open(txn.as_mut(), dbi, &mut cursor))? }
1718
Ok(RoCursor { cursor, _marker: marker::PhantomData })
1819
}
1920

heed/src/database.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl<'e, 'n, KC, DC, C> DatabaseOpenOptions<'e, 'n, KC, DC, C> {
139139
{
140140
assert_eq_env_txn!(self.env, rtxn);
141141

142-
match self.env.raw_init_database::<C>(rtxn.txn, self.name, self.flags) {
142+
match self.env.raw_init_database::<C>(rtxn.txn.unwrap(), self.name, self.flags) {
143143
Ok(dbi) => Ok(Some(Database::new(self.env.env_mut_ptr() as _, dbi))),
144144
Err(Error::Mdb(e)) if e.not_found() => Ok(None),
145145
Err(e) => Err(e),
@@ -164,7 +164,7 @@ impl<'e, 'n, KC, DC, C> DatabaseOpenOptions<'e, 'n, KC, DC, C> {
164164
assert_eq_env_txn!(self.env, wtxn);
165165

166166
let flags = self.flags | AllDatabaseFlags::CREATE;
167-
match self.env.raw_init_database::<C>(wtxn.txn.txn, self.name, flags) {
167+
match self.env.raw_init_database::<C>(wtxn.txn.txn.unwrap(), self.name, flags) {
168168
Ok(dbi) => Ok(Database::new(self.env.env_mut_ptr() as _, dbi)),
169169
Err(e) => Err(e),
170170
}
@@ -350,9 +350,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
350350

351351
let mut key_val = unsafe { crate::into_val(&key_bytes) };
352352
let mut data_val = mem::MaybeUninit::uninit();
353+
let mut txn = txn.txn.unwrap();
353354

354355
let result = unsafe {
355-
mdb_result(ffi::mdb_get(txn.txn, self.dbi, &mut key_val, data_val.as_mut_ptr()))
356+
mdb_result(ffi::mdb_get(txn.as_mut(), self.dbi, &mut key_val, data_val.as_mut_ptr()))
356357
};
357358

358359
match result {
@@ -954,7 +955,9 @@ impl<KC, DC, C> Database<KC, DC, C> {
954955
assert_eq_env_db_txn!(self, txn);
955956

956957
let mut db_stat = mem::MaybeUninit::uninit();
957-
let result = unsafe { mdb_result(ffi::mdb_stat(txn.txn, self.dbi, db_stat.as_mut_ptr())) };
958+
let mut txn = txn.txn.unwrap();
959+
let result =
960+
unsafe { mdb_result(ffi::mdb_stat(txn.as_mut(), self.dbi, db_stat.as_mut_ptr())) };
958961

959962
match result {
960963
Ok(()) => {
@@ -1835,9 +1838,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
18351838
let mut key_val = unsafe { crate::into_val(&key_bytes) };
18361839
let mut data_val = unsafe { crate::into_val(&data_bytes) };
18371840
let flags = 0;
1841+
let mut txn = txn.txn.txn.unwrap();
18381842

18391843
unsafe {
1840-
mdb_result(ffi::mdb_put(txn.txn.txn, self.dbi, &mut key_val, &mut data_val, flags))?
1844+
mdb_result(ffi::mdb_put(txn.as_mut(), self.dbi, &mut key_val, &mut data_val, flags))?
18411845
}
18421846

18431847
Ok(())
@@ -1896,9 +1900,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
18961900
let mut key_val = unsafe { crate::into_val(&key_bytes) };
18971901
let mut reserved = ffi::reserve_size_val(data_size);
18981902
let flags = ffi::MDB_RESERVE;
1903+
let mut txn = txn.txn.txn.unwrap();
18991904

19001905
unsafe {
1901-
mdb_result(ffi::mdb_put(txn.txn.txn, self.dbi, &mut key_val, &mut reserved, flags))?
1906+
mdb_result(ffi::mdb_put(txn.as_mut(), self.dbi, &mut key_val, &mut reserved, flags))?
19021907
}
19031908

19041909
let mut reserved = unsafe { ReservedSpace::from_val(reserved) };
@@ -1988,9 +1993,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
19881993
let mut key_val = unsafe { crate::into_val(&key_bytes) };
19891994
let mut data_val = unsafe { crate::into_val(&data_bytes) };
19901995
let flags = flags.bits();
1996+
let mut txn = txn.txn.txn.unwrap();
19911997

19921998
unsafe {
1993-
mdb_result(ffi::mdb_put(txn.txn.txn, self.dbi, &mut key_val, &mut data_val, flags))?
1999+
mdb_result(ffi::mdb_put(txn.as_mut(), self.dbi, &mut key_val, &mut data_val, flags))?
19942000
}
19952001

19962002
Ok(())
@@ -2095,9 +2101,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
20952101
let mut key_val = unsafe { crate::into_val(&key_bytes) };
20962102
let mut data_val = unsafe { crate::into_val(&data_bytes) };
20972103
let flags = (flags | PutFlags::NO_OVERWRITE).bits();
2104+
let mut txn = txn.txn.txn.unwrap();
20982105

20992106
let result = unsafe {
2100-
mdb_result(ffi::mdb_put(txn.txn.txn, self.dbi, &mut key_val, &mut data_val, flags))
2107+
mdb_result(ffi::mdb_put(txn.as_mut(), self.dbi, &mut key_val, &mut data_val, flags))
21012108
};
21022109

21032110
match result {
@@ -2245,9 +2252,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
22452252
let mut key_val = unsafe { crate::into_val(&key_bytes) };
22462253
let mut reserved = ffi::reserve_size_val(data_size);
22472254
let flags = (flags | PutFlags::NO_OVERWRITE).bits() | lmdb_master_sys::MDB_RESERVE;
2255+
let mut txn = txn.txn.txn.unwrap();
22482256

22492257
let result = unsafe {
2250-
mdb_result(ffi::mdb_put(txn.txn.txn, self.dbi, &mut key_val, &mut reserved, flags))
2258+
mdb_result(ffi::mdb_put(txn.as_mut(), self.dbi, &mut key_val, &mut reserved, flags))
22512259
};
22522260

22532261
match result {
@@ -2322,9 +2330,10 @@ impl<KC, DC, C> Database<KC, DC, C> {
23222330

23232331
let key_bytes: Cow<[u8]> = KC::bytes_encode(key).map_err(Error::Encoding)?;
23242332
let mut key_val = unsafe { crate::into_val(&key_bytes) };
2333+
let mut txn = txn.txn.txn.unwrap();
23252334

23262335
let result = unsafe {
2327-
mdb_result(ffi::mdb_del(txn.txn.txn, self.dbi, &mut key_val, ptr::null_mut()))
2336+
mdb_result(ffi::mdb_del(txn.as_mut(), self.dbi, &mut key_val, ptr::null_mut()))
23282337
};
23292338

23302339
match result {
@@ -2409,9 +2418,11 @@ impl<KC, DC, C> Database<KC, DC, C> {
24092418
let data_bytes: Cow<[u8]> = DC::bytes_encode(data).map_err(Error::Encoding)?;
24102419
let mut key_val = unsafe { crate::into_val(&key_bytes) };
24112420
let mut data_val = unsafe { crate::into_val(&data_bytes) };
2421+
let mut txn = txn.txn.txn.unwrap();
24122422

2413-
let result =
2414-
unsafe { mdb_result(ffi::mdb_del(txn.txn.txn, self.dbi, &mut key_val, &mut data_val)) };
2423+
let result = unsafe {
2424+
mdb_result(ffi::mdb_del(txn.as_mut(), self.dbi, &mut key_val, &mut data_val))
2425+
};
24152426

24162427
match result {
24172428
Ok(()) => Ok(true),
@@ -2533,8 +2544,9 @@ impl<KC, DC, C> Database<KC, DC, C> {
25332544
/// ```
25342545
pub fn clear(&self, txn: &mut RwTxn) -> Result<()> {
25352546
assert_eq_env_db_txn!(self, txn);
2547+
let mut txn = txn.txn.txn.unwrap();
25362548

2537-
unsafe { mdb_result(ffi::mdb_drop(txn.txn.txn, self.dbi, 0)).map_err(Into::into) }
2549+
unsafe { mdb_result(ffi::mdb_drop(txn.as_mut(), self.dbi, 0)).map_err(Into::into) }
25382550
}
25392551

25402552
/// Change the codec types of this database, specifying the codecs.

heed/src/env.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::os::unix::{
1212
use std::panic::catch_unwind;
1313
use std::path::{Path, PathBuf};
1414
use std::process::abort;
15+
use std::ptr::NonNull;
1516
use std::sync::{Arc, RwLock};
1617
use std::time::Duration;
1718
#[cfg(windows)]
@@ -605,7 +606,7 @@ impl Env {
605606

606607
let rtxn = self.read_txn()?;
607608
// Open the main database
608-
let dbi = self.raw_open_dbi::<DefaultComparator>(rtxn.txn, None, 0)?;
609+
let dbi = self.raw_open_dbi::<DefaultComparator>(rtxn.txn.unwrap(), None, 0)?;
609610

610611
// We're going to iterate on the unnamed database
611612
let mut cursor = RoCursor::new(&rtxn, dbi)?;
@@ -618,9 +619,12 @@ impl Env {
618619
let key = String::from_utf8(key.to_vec()).unwrap();
619620
// Calling `ffi::db_stat` on a database instance does not involve key comparison
620621
// in LMDB, so it's safe to specify a noop key compare function for it.
621-
if let Ok(dbi) = self.raw_open_dbi::<DefaultComparator>(rtxn.txn, Some(&key), 0) {
622+
if let Ok(dbi) =
623+
self.raw_open_dbi::<DefaultComparator>(rtxn.txn.unwrap(), Some(&key), 0)
624+
{
622625
let mut stat = mem::MaybeUninit::uninit();
623-
unsafe { mdb_result(ffi::mdb_stat(rtxn.txn, dbi, stat.as_mut_ptr()))? };
626+
let mut txn = rtxn.txn.unwrap();
627+
unsafe { mdb_result(ffi::mdb_stat(txn.as_mut(), dbi, stat.as_mut_ptr()))? };
624628
let stat = unsafe { stat.assume_init() };
625629
size += compute_size(stat);
626630
}
@@ -695,7 +699,7 @@ impl Env {
695699

696700
pub(crate) fn raw_init_database<C: Comparator + 'static>(
697701
&self,
698-
raw_txn: *mut ffi::MDB_txn,
702+
raw_txn: NonNull<ffi::MDB_txn>,
699703
name: Option<&str>,
700704
flags: AllDatabaseFlags,
701705
) -> Result<u32> {
@@ -707,7 +711,7 @@ impl Env {
707711

708712
fn raw_open_dbi<C: Comparator + 'static>(
709713
&self,
710-
raw_txn: *mut ffi::MDB_txn,
714+
mut raw_txn: NonNull<ffi::MDB_txn>,
711715
name: Option<&str>,
712716
flags: u32,
713717
) -> std::result::Result<u32, crate::mdb::lmdb_error::Error> {
@@ -721,9 +725,13 @@ impl Env {
721725
// safety: The name cstring is cloned by LMDB, we can drop it after.
722726
// If a read-only is used with the MDB_CREATE flag, LMDB will throw an error.
723727
unsafe {
724-
mdb_result(ffi::mdb_dbi_open(raw_txn, name_ptr, flags, &mut dbi))?;
728+
mdb_result(ffi::mdb_dbi_open(raw_txn.as_mut(), name_ptr, flags, &mut dbi))?;
725729
if TypeId::of::<C>() != TypeId::of::<DefaultComparator>() {
726-
mdb_result(ffi::mdb_set_compare(raw_txn, dbi, Some(custom_key_cmp_wrapper::<C>)))?;
730+
mdb_result(ffi::mdb_set_compare(
731+
raw_txn.as_mut(),
732+
dbi,
733+
Some(custom_key_cmp_wrapper::<C>),
734+
))?;
727735
}
728736
};
729737

heed/src/txn.rs

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::borrow::Cow;
22
use std::ops::Deref;
3-
use std::ptr;
3+
use std::ptr::{self, NonNull};
44

55
use crate::mdb::error::mdb_result;
66
use crate::mdb::ffi;
@@ -25,8 +25,28 @@ use crate::{Env, Result};
2525
/// Note: if your program already use POSIX semaphores, you will have less available for heed/LMDB!
2626
///
2727
/// You may increase the limit by editing it **at your own risk**: `/Library/LaunchDaemons/sysctl.plist`
28+
///
29+
/// ## This struct is covariant
30+
///
31+
/// ```rust
32+
/// #[allow(dead_code)]
33+
/// trait CovariantMarker<'a>: 'static {
34+
/// type T: 'a;
35+
///
36+
/// fn is_covariant(&'a self) -> &'a Self::T;
37+
/// }
38+
///
39+
/// impl<'a> CovariantMarker<'a> for heed::RoTxn<'static> {
40+
/// type T = heed::RoTxn<'a>;
41+
///
42+
/// fn is_covariant(&'a self) -> &'a heed::RoTxn<'a> {
43+
/// self
44+
/// }
45+
/// }
46+
/// ```
2847
pub struct RoTxn<'e> {
29-
pub(crate) txn: *mut ffi::MDB_txn,
48+
/// Makes the struct covariant and !Sync
49+
pub(crate) txn: Option<NonNull<ffi::MDB_txn>>,
3050
env: Cow<'e, Env>,
3151
}
3252

@@ -43,7 +63,7 @@ impl<'e> RoTxn<'e> {
4363
))?
4464
};
4565

46-
Ok(RoTxn { txn, env: Cow::Borrowed(env) })
66+
Ok(RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) })
4767
}
4868

4969
pub(crate) fn static_read_txn(env: Env) -> Result<RoTxn<'static>> {
@@ -58,7 +78,7 @@ impl<'e> RoTxn<'e> {
5878
))?
5979
};
6080

61-
Ok(RoTxn { txn, env: Cow::Owned(env) })
81+
Ok(RoTxn { txn: NonNull::new(txn), env: Cow::Owned(env) })
6282
}
6383

6484
pub(crate) fn env_mut_ptr(&self) -> *mut ffi::MDB_env {
@@ -75,29 +95,27 @@ impl<'e> RoTxn<'e> {
7595
/// After the transaction opening, the database is dropped. The next transaction might return
7696
/// `Io(Os { code: 22, kind: InvalidInput, message: "Invalid argument" })` known as `EINVAL`.
7797
pub fn commit(mut self) -> Result<()> {
78-
let result = unsafe { mdb_result(ffi::mdb_txn_commit(self.txn)) };
79-
self.txn = ptr::null_mut();
98+
// Asserts that the transaction hasn't been already
99+
// committed/aborter and ensure we cannot use it twice.
100+
let mut txn = self.txn.take().unwrap();
101+
let result = unsafe { mdb_result(ffi::mdb_txn_commit(txn.as_mut())) };
80102
result.map_err(Into::into)
81103
}
82104
}
83105

84106
impl Drop for RoTxn<'_> {
85107
fn drop(&mut self) {
86-
if !self.txn.is_null() {
87-
abort_txn(self.txn);
108+
if let Some(mut txn) = self.txn.take() {
109+
// Asserts that the transaction hasn't been already
110+
// committed/aborter and ensure we cannot use it twice.
111+
unsafe { ffi::mdb_txn_abort(txn.as_mut()) }
88112
}
89113
}
90114
}
91115

92116
#[cfg(feature = "read-txn-no-tls")]
93117
unsafe impl Send for RoTxn<'_> {}
94118

95-
fn abort_txn(txn: *mut ffi::MDB_txn) {
96-
// Asserts that the transaction hasn't been already committed.
97-
assert!(!txn.is_null());
98-
unsafe { ffi::mdb_txn_abort(txn) }
99-
}
100-
101119
/// A read-write transaction.
102120
///
103121
/// ## LMDB Limitations
@@ -116,6 +134,25 @@ fn abort_txn(txn: *mut ffi::MDB_txn) {
116134
/// Note: if your program already use POSIX semaphores, you will have less available for heed/LMDB!
117135
///
118136
/// You may increase the limit by editing it **at your own risk**: `/Library/LaunchDaemons/sysctl.plist`
137+
///
138+
/// ## This struct is covariant
139+
///
140+
/// ```rust
141+
/// #[allow(dead_code)]
142+
/// trait CovariantMarker<'a>: 'static {
143+
/// type T: 'a;
144+
///
145+
/// fn is_covariant(&'a self) -> &'a Self::T;
146+
/// }
147+
///
148+
/// impl<'a> CovariantMarker<'a> for heed::RwTxn<'static> {
149+
/// type T = heed::RwTxn<'a>;
150+
///
151+
/// fn is_covariant(&'a self) -> &'a heed::RwTxn<'a> {
152+
/// self
153+
/// }
154+
/// }
155+
/// ```
119156
pub struct RwTxn<'p> {
120157
pub(crate) txn: RoTxn<'p>,
121158
}
@@ -126,16 +163,17 @@ impl<'p> RwTxn<'p> {
126163

127164
unsafe { mdb_result(ffi::mdb_txn_begin(env.env_mut_ptr(), ptr::null_mut(), 0, &mut txn))? };
128165

129-
Ok(RwTxn { txn: RoTxn { txn, env: Cow::Borrowed(env) } })
166+
Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) } })
130167
}
131168

132169
pub(crate) fn nested(env: &'p Env, parent: &'p mut RwTxn) -> Result<RwTxn<'p>> {
133170
let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
134-
let parent_ptr: *mut ffi::MDB_txn = parent.txn.txn;
171+
let mut parent_txn = parent.txn.txn.unwrap();
172+
let parent_ptr: *mut ffi::MDB_txn = unsafe { parent_txn.as_mut() };
135173

136174
unsafe { mdb_result(ffi::mdb_txn_begin(env.env_mut_ptr(), parent_ptr, 0, &mut txn))? };
137175

138-
Ok(RwTxn { txn: RoTxn { txn, env: Cow::Borrowed(env) } })
176+
Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) } })
139177
}
140178

141179
pub(crate) fn env_mut_ptr(&self) -> *mut ffi::MDB_env {
@@ -145,16 +183,20 @@ impl<'p> RwTxn<'p> {
145183
/// Commit all the operations of a transaction into the database.
146184
/// The transaction is reset.
147185
pub fn commit(mut self) -> Result<()> {
148-
let result = unsafe { mdb_result(ffi::mdb_txn_commit(self.txn.txn)) };
149-
self.txn.txn = ptr::null_mut();
186+
// Asserts that the transaction hasn't been already
187+
// committed/aborter and ensure we cannot use it two times.
188+
let mut txn = self.txn.txn.take().unwrap();
189+
let result = unsafe { mdb_result(ffi::mdb_txn_commit(txn.as_mut())) };
150190
result.map_err(Into::into)
151191
}
152192

153193
/// Abandon all the operations of the transaction instead of saving them.
154194
/// The transaction is reset.
155195
pub fn abort(mut self) {
156-
abort_txn(self.txn.txn);
157-
self.txn.txn = ptr::null_mut();
196+
// Asserts that the transaction hasn't been already
197+
// committed/aborter and ensure we cannot use it twice.
198+
let mut txn = self.txn.txn.take().unwrap();
199+
unsafe { ffi::mdb_txn_abort(txn.as_mut()) }
158200
}
159201
}
160202

0 commit comments

Comments
 (0)