Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Feb 19, 2025
1 parent b36d343 commit ac4a5f8
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions relay-threading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ authors = ["Sentry <[email protected]>"]
description = "Threading code that is used by Relay"
homepage = "https://getsentry.github.io/relay/"
repository = "https://github.com/getsentry/relay"
version = "25.1.0"
version = "25.2.0"
edition = "2021"
license-file = "../LICENSE.md"
publish = false

[dependencies]
flume = { workspace = true }
futures = { workspace = true, features = ["executor"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync", "macros"] }
flume = { workspace = true, default-features = false }
futures = { workspace = true, default-features = false }
tokio = { workspace = true }
pin-project-lite = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }
futures = { workspace = true, default-features = false, features = ["executor"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync", "macros"] }

[[bench]]
name = "pool"
Expand Down
9 changes: 5 additions & 4 deletions relay-threading/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use std::sync::Arc;
use crate::pool::{AsyncPool, Thread};
use crate::pool::{CustomSpawn, DefaultSpawn, ThreadSpawn};

/// Type alias for a thread safe closure that is used for panic handling across the code.
pub(crate) type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;

/// [`AsyncPoolBuilder`] provides a flexible way to configure and build an [`AsyncPool`] for executing
/// asynchronous tasks concurrently on dedicated threads.
///
Expand All @@ -14,10 +17,8 @@ use crate::pool::{CustomSpawn, DefaultSpawn, ThreadSpawn};
pub struct AsyncPoolBuilder<S = DefaultSpawn> {
pub(crate) runtime: tokio::runtime::Handle,
pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>,
#[allow(clippy::type_complexity)]
pub(crate) thread_panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>,
#[allow(clippy::type_complexity)]
pub(crate) task_panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>,
pub(crate) thread_panic_handler: Option<Arc<PanicHandler>>,
pub(crate) task_panic_handler: Option<Arc<PanicHandler>>,
pub(crate) spawn_handler: S,
pub(crate) num_threads: usize,
pub(crate) max_concurrency: usize,
Expand Down
15 changes: 5 additions & 10 deletions relay-threading/src/multiplexing.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::any::Any;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::PanicHandler;
use futures::future::CatchUnwind;
use futures::stream::{FusedStream, FuturesUnordered, Stream};
use futures::FutureExt;
Expand All @@ -19,16 +19,15 @@ pin_project! {
struct Tasks<F> {
#[pin]
futures: FuturesUnordered<Unconstrained<CatchUnwind<AssertUnwindSafe<F>>>>,
panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>,
panic_handler: Option<Arc<PanicHandler>>,
}
}

impl<F> Tasks<F> {
/// Creates a new task manager.
///
/// This internal constructor initializes a new collection for tracking asynchronous tasks.
#[allow(clippy::type_complexity)]
fn new(panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>) -> Self {
fn new(panic_handler: Option<Arc<PanicHandler>>) -> Self {
Self {
futures: FuturesUnordered::new(),
panic_handler,
Expand Down Expand Up @@ -118,12 +117,8 @@ where
///
/// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler
/// can be provided to manage errors during task execution.
#[allow(clippy::type_complexity)]
pub fn new(
max_concurrency: usize,
rx: S,
panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>,
) -> Self {
pub fn new(max_concurrency: usize, rx: S, panic_handler: Option<Arc<PanicHandler>>) -> Self {
Self {
max_concurrency,
rx,
Expand Down
31 changes: 24 additions & 7 deletions relay-threading/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::any::Any;
use std::future::Future;
use std::io;
use std::panic::AssertUnwindSafe;
Expand All @@ -9,6 +8,7 @@ use futures::FutureExt;

use crate::builder::AsyncPoolBuilder;
use crate::multiplexing::Multiplexed;
use crate::PanicHandler;

/// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads.
///
Expand Down Expand Up @@ -67,11 +67,12 @@ where
///
/// # Panics
///
/// This method panics if the internal task channel is unexpectedly closed.
/// This method panics if all receivers have been dropped which can happen when all threads of
/// the pool panicked.
pub fn spawn(&self, future: F) {
assert!(
self.tx.send(future).is_ok(),
"receiver never exits before sender"
"failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
);
}

Expand All @@ -81,11 +82,12 @@ where
///
/// # Panics
///
/// This method panics if the internal task channel is unexpectedly closed.
/// This method panics if all receivers have been dropped which can happen when all threads of
/// the pool panicked.
pub async fn spawn_async(&self, future: F) {
assert!(
self.tx.send_async(future).await.is_ok(),
"receiver never exits before sender"
"failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
);
}
}
Expand All @@ -96,8 +98,7 @@ pub struct Thread {
max_concurrency: usize,
name: Option<String>,
runtime: tokio::runtime::Handle,
#[allow(clippy::type_complexity)]
panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>,
panic_handler: Option<Arc<PanicHandler>>,
task: BoxFuture<'static, ()>,
}

Expand Down Expand Up @@ -206,6 +207,7 @@ where
#[cfg(test)]
mod tests {
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::AtomicBool;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -368,4 +370,19 @@ mod tests {

assert!(has_panicked.load(Ordering::SeqCst));
}

#[tokio::test]
async fn test_spawn_panics_if_no_threads_are_available() {
let pool = AsyncPoolBuilder::new(Handle::current())
.num_threads(0)
.max_concurrency(1)
.build()
.unwrap();

let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
pool.spawn(async move {});
}));

assert!(result.is_err());
}
}
1 change: 1 addition & 0 deletions relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
//! - [`relay-statsd`]: High-level StatsD metric client for internal measurements.
//! - [`relay-system`]: Foundational system components for Relay's services.
//! - [`relay-test`]: Helpers for testing the web server and services.
//! - [`relay-threading`]: Threading code that is used by Relay.
//! - [`relay-ua`]: User agent parser with built-in rules.
//!
//! # Tools
Expand Down

0 comments on commit ac4a5f8

Please sign in to comment.