Skip to content

Commit

Permalink
Reintroduce the possibility to move iterators between threads when po…
Browse files Browse the repository at this point in the history
…ssible
  • Loading branch information
Kerollmops committed Mar 5, 2025
1 parent a3c4f74 commit 92003c2
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 107 deletions.
14 changes: 7 additions & 7 deletions heed/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use crate::mdb::error::mdb_result;
use crate::mdb::ffi;
use crate::*;

pub struct RoCursor<'txn> {
pub struct RoCursor<'txn, T> {
cursor: *mut ffi::MDB_cursor,
_marker: marker::PhantomData<&'txn ()>,
_marker: marker::PhantomData<&'txn T>,
}

impl<'txn> RoCursor<'txn> {
impl<'txn, T> RoCursor<'txn, T> {
// TODO should I ask for a &mut RoTxn<'_, T>, here?
pub(crate) fn new<T>(txn: &'txn RoTxn<'_, T>, dbi: ffi::MDB_dbi) -> Result<RoCursor<'txn>> {
pub(crate) fn new(txn: &'txn RoTxn<'_, T>, dbi: ffi::MDB_dbi) -> Result<RoCursor<'txn, T>> {
let mut cursor: *mut ffi::MDB_cursor = ptr::null_mut();
let mut txn = txn.txn_ptr();
unsafe { mdb_result(ffi::mdb_cursor_open(txn.as_mut(), dbi, &mut cursor))? }
Expand Down Expand Up @@ -237,14 +237,14 @@ impl<'txn> RoCursor<'txn> {
}
}

impl Drop for RoCursor<'_> {
impl<T> Drop for RoCursor<'_, T> {
fn drop(&mut self) {
unsafe { ffi::mdb_cursor_close(self.cursor) }
}
}

pub struct RwCursor<'txn> {
cursor: RoCursor<'txn>,
cursor: RoCursor<'txn, WithoutTls>,
}

impl<'txn> RwCursor<'txn> {
Expand Down Expand Up @@ -404,7 +404,7 @@ impl<'txn> RwCursor<'txn> {
}

impl<'txn> Deref for RwCursor<'txn> {
type Target = RoCursor<'txn>;
type Target = RoCursor<'txn, WithoutTls>;

fn deref(&self) -> &Self::Target {
&self.cursor
Expand Down
14 changes: 7 additions & 7 deletions heed/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
&self,
txn: &'txn RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<RoIter<'txn, KC, DC, MoveOnCurrentKeyDuplicates>>>
) -> Result<Option<RoIter<'txn, T, KC, DC, MoveOnCurrentKeyDuplicates>>>
where
KC: BytesEncode<'a>,
{
Expand Down Expand Up @@ -1026,7 +1026,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn iter<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<RoIter<'txn, KC, DC>> {
pub fn iter<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<RoIter<'txn, T, KC, DC>> {
assert_eq_env_db_txn!(self, txn);
RoCursor::new(txn, self.dbi).map(|cursor| RoIter::new(cursor))
}
Expand Down Expand Up @@ -1128,7 +1128,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn rev_iter<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<RoRevIter<'txn, KC, DC>> {
pub fn rev_iter<'txn, T>(&self, txn: &'txn RoTxn<T>) -> Result<RoRevIter<'txn, T, KC, DC>> {
assert_eq_env_db_txn!(self, txn);

RoCursor::new(txn, self.dbi).map(|cursor| RoRevIter::new(cursor))
Expand Down Expand Up @@ -1239,7 +1239,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
&self,
txn: &'txn RoTxn<T>,
range: &'a R,
) -> Result<RoRange<'txn, KC, DC, C>>
) -> Result<RoRange<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
R: RangeBounds<KC::EItem>,
Expand Down Expand Up @@ -1412,7 +1412,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
&self,
txn: &'txn RoTxn<T>,
range: &'a R,
) -> Result<RoRevRange<'txn, KC, DC, C>>
) -> Result<RoRevRange<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
R: RangeBounds<KC::EItem>,
Expand Down Expand Up @@ -1587,7 +1587,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
&self,
txn: &'txn RoTxn<T>,
prefix: &'a KC::EItem,
) -> Result<RoPrefix<'txn, KC, DC, C>>
) -> Result<RoPrefix<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
C: LexicographicComparator,
Expand Down Expand Up @@ -1720,7 +1720,7 @@ impl<KC, DC, C> Database<KC, DC, C> {
&self,
txn: &'txn RoTxn<'_, T>,
prefix: &'a KC::EItem,
) -> Result<RoRevPrefix<'txn, KC, DC, C>>
) -> Result<RoRevPrefix<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
C: LexicographicComparator,
Expand Down
14 changes: 7 additions & 7 deletions heed/src/databases/encrypted_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
&self,
txn: &'txn mut RoTxn<T>,
key: &'a KC::EItem,
) -> Result<Option<RoIter<'txn, KC, DC, MoveOnCurrentKeyDuplicates>>>
) -> Result<Option<RoIter<'txn, T, KC, DC, MoveOnCurrentKeyDuplicates>>>
where
KC: BytesEncode<'a>,
{
Expand Down Expand Up @@ -862,7 +862,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn iter<'txn, T>(&self, txn: &'txn mut RoTxn<T>) -> Result<RoIter<'txn, KC, DC>> {
pub fn iter<'txn, T>(&self, txn: &'txn mut RoTxn<T>) -> Result<RoIter<'txn, T, KC, DC>> {
self.inner.iter(txn)
}

Expand Down Expand Up @@ -961,7 +961,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn rev_iter<'txn, T>(&self, txn: &'txn mut RoTxn<T>) -> Result<RoRevIter<'txn, KC, DC>> {
pub fn rev_iter<'txn, T>(&self, txn: &'txn mut RoTxn<T>) -> Result<RoRevIter<'txn, T, KC, DC>> {
self.inner.rev_iter(txn)
}

Expand Down Expand Up @@ -1068,7 +1068,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
&self,
txn: &'txn mut RoTxn<T>,
range: &'a R,
) -> Result<RoRange<'txn, KC, DC, C>>
) -> Result<RoRange<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
R: RangeBounds<KC::EItem>,
Expand Down Expand Up @@ -1191,7 +1191,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
&self,
txn: &'txn mut RoTxn<T>,
range: &'a R,
) -> Result<RoRevRange<'txn, KC, DC, C>>
) -> Result<RoRevRange<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
R: RangeBounds<KC::EItem>,
Expand Down Expand Up @@ -1315,7 +1315,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
&self,
txn: &'txn mut RoTxn<T>,
prefix: &'a KC::EItem,
) -> Result<RoPrefix<'txn, KC, DC, C>>
) -> Result<RoPrefix<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
C: LexicographicComparator,
Expand Down Expand Up @@ -1440,7 +1440,7 @@ impl<KC, DC, C> EncryptedDatabase<KC, DC, C> {
&self,
txn: &'txn mut RoTxn<T>,
prefix: &'a KC::EItem,
) -> Result<RoRevPrefix<'txn, KC, DC, C>>
) -> Result<RoRevPrefix<'txn, T, KC, DC, C>>
where
KC: BytesEncode<'a>,
C: LexicographicComparator,
Expand Down
53 changes: 28 additions & 25 deletions heed/src/iterator/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use crate::iteration_method::{IterationMethod, MoveBetweenKeys, MoveThroughDupli
use crate::*;

/// A read-only iterator structure.
pub struct RoIter<'txn, KC, DC, IM = MoveThroughDuplicateValues> {
cursor: RoCursor<'txn>,
pub struct RoIter<'txn, T, KC, DC, IM = MoveThroughDuplicateValues> {
cursor: RoCursor<'txn, T>,
move_on_first: bool,
_phantom: marker::PhantomData<(KC, DC, IM)>,
}

impl<'txn, KC, DC, IM> RoIter<'txn, KC, DC, IM> {
pub(crate) fn new(cursor: RoCursor<'txn>) -> RoIter<'txn, KC, DC, IM> {
impl<'txn, T, KC, DC, IM> RoIter<'txn, T, KC, DC, IM> {
pub(crate) fn new(cursor: RoCursor<'txn, T>) -> RoIter<'txn, T, KC, DC, IM> {
RoIter { cursor, move_on_first: true, _phantom: marker::PhantomData }
}

Expand Down Expand Up @@ -63,7 +63,7 @@ impl<'txn, KC, DC, IM> RoIter<'txn, KC, DC, IM> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn move_between_keys(self) -> RoIter<'txn, KC, DC, MoveBetweenKeys> {
pub fn move_between_keys(self) -> RoIter<'txn, T, KC, DC, MoveBetweenKeys> {
RoIter {
cursor: self.cursor,
move_on_first: self.move_on_first,
Expand Down Expand Up @@ -119,7 +119,9 @@ impl<'txn, KC, DC, IM> RoIter<'txn, KC, DC, IM> {
/// wtxn.commit()?;
/// # Ok(()) }
/// ```
pub fn move_through_duplicate_values(self) -> RoIter<'txn, KC, DC, MoveThroughDuplicateValues> {
pub fn move_through_duplicate_values(
self,
) -> RoIter<'txn, T, KC, DC, MoveThroughDuplicateValues> {
RoIter {
cursor: self.cursor,
move_on_first: self.move_on_first,
Expand All @@ -128,7 +130,7 @@ impl<'txn, KC, DC, IM> RoIter<'txn, KC, DC, IM> {
}

/// Change the codec types of this iterator, specifying the codecs.
pub fn remap_types<KC2, DC2>(self) -> RoIter<'txn, KC2, DC2, IM> {
pub fn remap_types<KC2, DC2>(self) -> RoIter<'txn, T, KC2, DC2, IM> {
RoIter {
cursor: self.cursor,
move_on_first: self.move_on_first,
Expand All @@ -137,22 +139,22 @@ impl<'txn, KC, DC, IM> RoIter<'txn, KC, DC, IM> {
}

/// Change the key codec type of this iterator, specifying the new codec.
pub fn remap_key_type<KC2>(self) -> RoIter<'txn, KC2, DC, IM> {
pub fn remap_key_type<KC2>(self) -> RoIter<'txn, T, KC2, DC, IM> {
self.remap_types::<KC2, DC>()
}

/// Change the data codec type of this iterator, specifying the new codec.
pub fn remap_data_type<DC2>(self) -> RoIter<'txn, KC, DC2, IM> {
pub fn remap_data_type<DC2>(self) -> RoIter<'txn, T, KC, DC2, IM> {
self.remap_types::<KC, DC2>()
}

/// Wrap the data bytes into a lazy decoder.
pub fn lazily_decode_data(self) -> RoIter<'txn, KC, LazyDecode<DC>, IM> {
pub fn lazily_decode_data(self) -> RoIter<'txn, T, KC, LazyDecode<DC>, IM> {
self.remap_types::<KC, LazyDecode<DC>>()
}
}

impl<'txn, KC, DC, IM> Iterator for RoIter<'txn, KC, DC, IM>
impl<'txn, T, KC, DC, IM> Iterator for RoIter<'txn, T, KC, DC, IM>
where
KC: BytesDecode<'txn>,
DC: BytesDecode<'txn>,
Expand Down Expand Up @@ -208,8 +210,8 @@ impl<KC, DC, IM> fmt::Debug for RoIter<'_, KC, DC, IM> {
}
}

/// A `RoIter` is `Send` only if the `RoTxn` is.
unsafe impl<KC, DC, IM> Send for RoIter<'_, KC, DC, IM> {}
// Only implement Send if the transaction is Send (WithoutTls)
unsafe impl<KC, DC, IM> Send for RoIter<'_, WithoutTls, KC, DC, IM> {}

/// A read-write iterator structure.
pub struct RwIter<'txn, KC, DC, IM = MoveThroughDuplicateValues> {
Expand Down Expand Up @@ -441,21 +443,21 @@ impl<KC, DC, IM> fmt::Debug for RwIter<'_, KC, DC, IM> {
}

/// A reverse read-only iterator structure.
pub struct RoRevIter<'txn, KC, DC, IM = MoveThroughDuplicateValues> {
cursor: RoCursor<'txn>,
pub struct RoRevIter<'txn, T, KC, DC, IM = MoveThroughDuplicateValues> {
cursor: RoCursor<'txn, T>,
move_on_last: bool,
_phantom: marker::PhantomData<(KC, DC, IM)>,
}

impl<'txn, KC, DC, IM> RoRevIter<'txn, KC, DC, IM> {
pub(crate) fn new(cursor: RoCursor<'txn>) -> RoRevIter<'txn, KC, DC, IM> {
impl<'txn, T, KC, DC, IM> RoRevIter<'txn, T, KC, DC, IM> {
pub(crate) fn new(cursor: RoCursor<'txn, T>) -> RoRevIter<'txn, T, KC, DC, IM> {
RoRevIter { cursor, move_on_last: true, _phantom: marker::PhantomData }
}

/// Move on the first value of keys, ignoring duplicate values.
///
/// For more info, see [`RoIter::move_between_keys`].
pub fn move_between_keys(self) -> RoRevIter<'txn, KC, DC, MoveBetweenKeys> {
pub fn move_between_keys(self) -> RoRevIter<'txn, T, KC, DC, MoveBetweenKeys> {
RoRevIter {
cursor: self.cursor,
move_on_last: self.move_on_last,
Expand All @@ -468,7 +470,7 @@ impl<'txn, KC, DC, IM> RoRevIter<'txn, KC, DC, IM> {
/// For more info, see [`RoIter::move_through_duplicate_values`].
pub fn move_through_duplicate_values(
self,
) -> RoRevIter<'txn, KC, DC, MoveThroughDuplicateValues> {
) -> RoRevIter<'txn, T, KC, DC, MoveThroughDuplicateValues> {
RoRevIter {
cursor: self.cursor,
move_on_last: self.move_on_last,
Expand All @@ -477,7 +479,7 @@ impl<'txn, KC, DC, IM> RoRevIter<'txn, KC, DC, IM> {
}

/// Change the codec types of this iterator, specifying the codecs.
pub fn remap_types<KC2, DC2>(self) -> RoRevIter<'txn, KC2, DC2, IM> {
pub fn remap_types<KC2, DC2>(self) -> RoRevIter<'txn, T, KC2, DC2, IM> {
RoRevIter {
cursor: self.cursor,
move_on_last: self.move_on_last,
Expand All @@ -486,22 +488,22 @@ impl<'txn, KC, DC, IM> RoRevIter<'txn, KC, DC, IM> {
}

/// Change the key codec type of this iterator, specifying the new codec.
pub fn remap_key_type<KC2>(self) -> RoRevIter<'txn, KC2, DC, IM> {
pub fn remap_key_type<KC2>(self) -> RoRevIter<'txn, T, KC2, DC, IM> {
self.remap_types::<KC2, DC>()
}

/// Change the data codec type of this iterator, specifying the new codec.
pub fn remap_data_type<DC2>(self) -> RoRevIter<'txn, KC, DC2, IM> {
pub fn remap_data_type<DC2>(self) -> RoRevIter<'txn, T, KC, DC2, IM> {
self.remap_types::<KC, DC2>()
}

/// Wrap the data bytes into a lazy decoder.
pub fn lazily_decode_data(self) -> RoRevIter<'txn, KC, LazyDecode<DC>, IM> {
pub fn lazily_decode_data(self) -> RoRevIter<'txn, T, KC, LazyDecode<DC>, IM> {
self.remap_types::<KC, LazyDecode<DC>>()
}
}

impl<'txn, KC, DC, IM> Iterator for RoRevIter<'txn, KC, DC, IM>
impl<'txn, T, KC, DC, IM> Iterator for RoRevIter<'txn, T, KC, DC, IM>
where
KC: BytesDecode<'txn>,
DC: BytesDecode<'txn>,
Expand Down Expand Up @@ -557,7 +559,8 @@ impl<KC, DC, IM> fmt::Debug for RoRevIter<'_, KC, DC, IM> {
}
}

unsafe impl<KC, DC, IM> Send for RoRevIter<'_, KC, DC, IM> {}
// Only implement Send if the transaction is Send (WithoutTls)
unsafe impl<KC, DC, IM> Send for RoRevIter<'_, WithoutTls, KC, DC, IM> {}

/// A reverse read-write iterator structure.
pub struct RwRevIter<'txn, KC, DC, IM = MoveThroughDuplicateValues> {
Expand Down
Loading

0 comments on commit 92003c2

Please sign in to comment.