Skip to content

Commit

Permalink
add a MutexExt trait to avoid deadlocks using Mutex::lock() (#4934)
Browse files Browse the repository at this point in the history
* add a MutexExt trait to avoid deadlocks using Mutex::lock()

* add changelog entry

* apply review suggestions

* replace ThreadStateGuard with SuspendGIL

* use try_lock before attempting a blocking lock

* collapse unnecessary match

Co-authored-by: David Hewitt <[email protected]>

---------

Co-authored-by: David Hewitt <[email protected]>
  • Loading branch information
ngoldbaum and davidhewitt authored Feb 27, 2025
1 parent 8567b6e commit 72f9e75
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 34 deletions.
4 changes: 3 additions & 1 deletion guide/src/class/thread-safety.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ impl MyClass {
}
```

If you need to lock around state stored in the Python interpreter or otherwise call into the Python C API while a lock is held, you might find the `MutexExt` trait useful. It provides a `lock_py_attached` method for `std::sync::Mutex` that avoids deadlocks with the GIL or other global synchronization events in the interpreter.

### Wrapping unsynchronized data

In some cases, the data structures stored within a `#[pyclass]` may themselves not be thread-safe. Rust will therefore not implement `Send` and `Sync` on the `#[pyclass]` type.

To achieve thread-safety, a manual `Send` and `Sync` implementation is required which is `unsafe` and should only be done following careful review of the soundness of the implementation. Doing this for PyO3 types is no different than for any other Rust code, [the Rustonomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html) has a great discussion on this.
To achieve thread-safety, a manual `Send` and `Sync` implementation is required which is `unsafe` and should only be done following careful review of the soundness of the implementation. Doing this for PyO3 types is no different than for any other Rust code, [the Rustonomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html) has a great discussion on this.
15 changes: 6 additions & 9 deletions guide/src/free-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,18 +387,15 @@ Python::with_gil(|py| {
# }
```

If you are executing arbitrary Python code while holding the lock, then you will
need to use conditional compilation to use [`GILProtected`] on GIL-enabled Python
builds and mutexes otherwise. If your use of [`GILProtected`] does not guard the
execution of arbitrary Python code or use of the CPython C API, then conditional
compilation is likely unnecessary since [`GILProtected`] was not needed in the
first place and instead Rust mutexes or atomics should be preferred. Python 3.13
introduces [`PyMutex`](https://docs.python.org/3/c-api/init.html#c.PyMutex),
which releases the GIL while the waiting for the lock, so that is another option
if you only need to support newer Python versions.
If you are executing arbitrary Python code while holding the lock, then you
should import the [`MutexExt`] trait and use the `lock_py_attached` method
instead of `lock`. This ensures that global synchronization events started by
the Python runtime can proceed, avoiding possible deadlocks with the
interpreter.

[`GILOnceCell`]: {{#PYO3_DOCS_URL}}/pyo3/sync/struct.GILOnceCell.html
[`GILProtected`]: https://docs.rs/pyo3/0.22/pyo3/sync/struct.GILProtected.html
[`MutexExt`]: {{#PYO3_DOCS_URL}}/pyo3/sync/trait.MutexExt.html
[`Once`]: https://doc.rust-lang.org/stable/std/sync/struct.Once.html
[`Once::call_once`]: https://doc.rust-lang.org/stable/std/sync/struct.Once.html#tymethod.call_once
[`Once::call_once_force`]: https://doc.rust-lang.org/stable/std/sync/struct.Once.html#tymethod.call_once_force
Expand Down
1 change: 1 addition & 0 deletions newsfragments/4934.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added `MutextExt`, an extension trait to avoid deadlocks with the GIL while locking a `std::sync::Mutex`.
1 change: 1 addition & 0 deletions src/sealed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ impl<T: crate::type_object::PyTypeInfo> Sealed for PyNativeTypeInitializer<T> {}
impl<T: crate::pyclass::PyClass> Sealed for PyClassInitializer<T> {}

impl Sealed for std::sync::Once {}
impl<T> Sealed for std::sync::Mutex<T> {}
148 changes: 124 additions & 24 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//!
//! [PEP 703]: https://peps.python.org/pep-703/
use crate::{
ffi,
gil::SuspendGIL,
sealed::Sealed,
types::{any::PyAnyMethods, PyAny, PyString},
Bound, Py, PyResult, PyTypeCheck, Python,
Expand Down Expand Up @@ -498,7 +498,7 @@ pub trait OnceExt: Sealed {
fn call_once_force_py_attached(&self, py: Python<'_>, f: impl FnOnce(&OnceState));
}

// Extension trait for [`std::sync::OnceLock`] which helps avoid deadlocks between the Python
/// Extension trait for [`std::sync::OnceLock`] which helps avoid deadlocks between the Python
/// interpreter and initialization with the `OnceLock`.
#[cfg(rustc_has_once_lock)]
pub trait OnceLockExt<T>: once_lock_ext_sealed::Sealed {
Expand All @@ -516,12 +516,20 @@ pub trait OnceLockExt<T>: once_lock_ext_sealed::Sealed {
F: FnOnce() -> T;
}

struct Guard(*mut crate::ffi::PyThreadState);

impl Drop for Guard {
fn drop(&mut self) {
unsafe { ffi::PyEval_RestoreThread(self.0) };
}
/// Extension trait for [`std::sync::Mutex`] which helps avoid deadlocks between
/// the Python interpreter and acquiring the `Mutex`.
pub trait MutexExt<T>: Sealed {
/// Lock this `Mutex` in a manner that cannot deadlock with the Python interpreter.
///
/// Before attempting to lock the mutex, this function detaches from the
/// Python runtime. When the lock is acquired, it re-attaches to the Python
/// runtime before returning the `LockResult`. This avoids deadlocks between
/// the GIL and other global synchronization events triggered by the Python
/// interpreter.
fn lock_py_attached(
&self,
py: Python<'_>,
) -> std::sync::LockResult<std::sync::MutexGuard<'_, T>>;
}

impl OnceExt for Once {
Expand Down Expand Up @@ -557,14 +565,41 @@ impl<T> OnceLockExt<T> for std::sync::OnceLock<T> {
}
}

impl<T> MutexExt<T> for std::sync::Mutex<T> {
fn lock_py_attached(
&self,
_py: Python<'_>,
) -> std::sync::LockResult<std::sync::MutexGuard<'_, T>> {
// If try_lock is successful or returns a poisoned mutex, return them so
// the caller can deal with them. Otherwise we need to use blocking
// lock, which requires detaching from the Python runtime to avoid
// possible deadlocks.
match self.try_lock() {
Ok(inner) => return Ok(inner),
Err(std::sync::TryLockError::Poisoned(inner)) => {
return std::sync::LockResult::Err(inner)
}
Err(std::sync::TryLockError::WouldBlock) => {}
}
// SAFETY: detach from the runtime right before a possibly blocking call
// then reattach when the blocking call completes and before calling
// into the C API.
let ts_guard = unsafe { SuspendGIL::new() };
let res = self.lock();
drop(ts_guard);
res
}
}

#[cold]
fn init_once_py_attached<F, T>(once: &Once, _py: Python<'_>, f: F)
where
F: FnOnce() -> T,
{
// Safety: we are currently attached to the GIL, and we expect to block. We will save
// the current thread state and restore it as soon as we are done blocking.
let ts_guard = Guard(unsafe { ffi::PyEval_SaveThread() });
// SAFETY: detach from the runtime right before a possibly blocking call
// then reattach when the blocking call completes and before calling
// into the C API.
let ts_guard = unsafe { SuspendGIL::new() };

once.call_once(move || {
drop(ts_guard);
Expand All @@ -577,9 +612,10 @@ fn init_once_force_py_attached<F, T>(once: &Once, _py: Python<'_>, f: F)
where
F: FnOnce(&OnceState) -> T,
{
// Safety: we are currently attached to the GIL, and we expect to block. We will save
// the current thread state and restore it as soon as we are done blocking.
let ts_guard = Guard(unsafe { ffi::PyEval_SaveThread() });
// SAFETY: detach from the runtime right before a possibly blocking call
// then reattach when the blocking call completes and before calling
// into the C API.
let ts_guard = unsafe { SuspendGIL::new() };

once.call_once_force(move |state| {
drop(ts_guard);
Expand All @@ -597,8 +633,10 @@ fn init_once_lock_py_attached<'a, F, T>(
where
F: FnOnce() -> T,
{
// SAFETY: we are currently attached to a Python thread
let ts_guard = Guard(unsafe { ffi::PyEval_SaveThread() });
// SAFETY: detach from the runtime right before a possibly blocking call
// then reattach when the blocking call completes and before calling
// into the C API.
let ts_guard = unsafe { SuspendGIL::new() };

// this trait is guarded by a rustc version config
// so clippy's MSRV check is wrong
Expand All @@ -618,6 +656,19 @@ mod tests {
use super::*;

use crate::types::{PyDict, PyDictMethods};
#[cfg(not(target_arch = "wasm32"))]
use std::sync::Mutex;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "macros")]
use std::sync::{
atomic::{AtomicBool, Ordering},
Barrier,
};

#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "macros")]
#[crate::pyclass(crate = "crate")]
struct BoolWrapper(AtomicBool);

#[test]
fn test_intern() {
Expand Down Expand Up @@ -692,16 +743,8 @@ mod tests {
#[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled
#[test]
fn test_critical_section() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Barrier,
};

let barrier = Barrier::new(2);

#[crate::pyclass(crate = "crate")]
struct BoolWrapper(AtomicBool);

let bool_wrapper = Python::with_gil(|py| -> Py<BoolWrapper> {
Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap()
});
Expand Down Expand Up @@ -781,4 +824,61 @@ mod tests {
});
assert_eq!(cell.get(), Some(&12345));
}

#[cfg(feature = "macros")]
#[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled
#[test]
fn test_mutex_ext() {
let barrier = Barrier::new(2);

let mutex = Python::with_gil(|py| -> Mutex<Py<BoolWrapper>> {
Mutex::new(Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap())
});

std::thread::scope(|s| {
s.spawn(|| {
Python::with_gil(|py| {
let b = mutex.lock_py_attached(py).unwrap();
barrier.wait();
// sleep to ensure the other thread actually blocks
std::thread::sleep(std::time::Duration::from_millis(10));
(*b).bind(py).borrow().0.store(true, Ordering::Release);
drop(b);
});
});
s.spawn(|| {
barrier.wait();
Python::with_gil(|py| {
// blocks until the other thread releases the lock
let b = mutex.lock_py_attached(py).unwrap();
assert!((*b).bind(py).borrow().0.load(Ordering::Acquire));
});
});
});
}

#[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled
#[test]
fn test_mutex_ext_poison() {
let mutex = Mutex::new(42);

std::thread::scope(|s| {
let lock_result = s.spawn(|| {
Python::with_gil(|py| {
let _unused = mutex.lock_py_attached(py);
panic!();
});
});
assert!(lock_result.join().is_err());
assert!(mutex.is_poisoned());
});
let guard = Python::with_gil(|py| {
// recover from the poisoning
match mutex.lock_py_attached(py) {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
});
assert!(*guard == 42);
}
}

0 comments on commit 72f9e75

Please sign in to comment.