From ac4a5f8fde06f174b0d1716c31aba70bab27ed81 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 19 Feb 2025 10:32:34 +0100 Subject: [PATCH] Fix --- Cargo.lock | 2 +- relay-threading/Cargo.toml | 10 ++++++---- relay-threading/src/builder.rs | 9 +++++---- relay-threading/src/multiplexing.rs | 15 +++++--------- relay-threading/src/pool.rs | 31 ++++++++++++++++++++++------- relay/src/main.rs | 1 + 6 files changed, 42 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a9d9297592..94e98f31a34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4093,7 +4093,7 @@ dependencies = [ [[package]] name = "relay-threading" -version = "25.1.0" +version = "25.2.0" dependencies = [ "criterion", "flume", diff --git a/relay-threading/Cargo.toml b/relay-threading/Cargo.toml index c10f6feec39..0dc237890b2 100644 --- a/relay-threading/Cargo.toml +++ b/relay-threading/Cargo.toml @@ -4,19 +4,21 @@ authors = ["Sentry "] description = "Threading code that is used by Relay" homepage = "https://getsentry.github.io/relay/" repository = "https://github.com/getsentry/relay" -version = "25.1.0" +version = "25.2.0" edition = "2021" license-file = "../LICENSE.md" publish = false [dependencies] -flume = { workspace = true } -futures = { workspace = true, features = ["executor"] } -tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync", "macros"] } +flume = { workspace = true, default-features = false } +futures = { workspace = true, default-features = false } +tokio = { workspace = true } pin-project-lite = { workspace = true } [dev-dependencies] criterion = { workspace = true, features = ["async_tokio"] } +futures = { workspace = true, default-features = false, features = ["executor"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync", "macros"] } [[bench]] name = "pool" diff --git a/relay-threading/src/builder.rs b/relay-threading/src/builder.rs index ee5315de310..b0bc3d3bbcb 100644 --- a/relay-threading/src/builder.rs +++ b/relay-threading/src/builder.rs @@ -6,6 +6,9 @@ use std::sync::Arc; use crate::pool::{AsyncPool, Thread}; use crate::pool::{CustomSpawn, DefaultSpawn, ThreadSpawn}; +/// Type alias for a thread safe closure that is used for panic handling across the code. +pub(crate) type PanicHandler = dyn Fn(Box) + Send + Sync; + /// [`AsyncPoolBuilder`] provides a flexible way to configure and build an [`AsyncPool`] for executing /// asynchronous tasks concurrently on dedicated threads. /// @@ -14,10 +17,8 @@ use crate::pool::{CustomSpawn, DefaultSpawn, ThreadSpawn}; pub struct AsyncPoolBuilder { pub(crate) runtime: tokio::runtime::Handle, pub(crate) thread_name: Option String>>, - #[allow(clippy::type_complexity)] - pub(crate) thread_panic_handler: Option) + Send + Sync>>, - #[allow(clippy::type_complexity)] - pub(crate) task_panic_handler: Option) + Send + Sync>>, + pub(crate) thread_panic_handler: Option>, + pub(crate) task_panic_handler: Option>, pub(crate) spawn_handler: S, pub(crate) num_threads: usize, pub(crate) max_concurrency: usize, diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs index ee742a6a2bb..aef76f54b21 100644 --- a/relay-threading/src/multiplexing.rs +++ b/relay-threading/src/multiplexing.rs @@ -1,10 +1,10 @@ -use std::any::Any; use std::future::Future; use std::panic::AssertUnwindSafe; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::PanicHandler; use futures::future::CatchUnwind; use futures::stream::{FusedStream, FuturesUnordered, Stream}; use futures::FutureExt; @@ -19,7 +19,7 @@ pin_project! { struct Tasks { #[pin] futures: FuturesUnordered>>>, - panic_handler: Option) + Send + Sync>>, + panic_handler: Option>, } } @@ -27,8 +27,7 @@ impl Tasks { /// Creates a new task manager. /// /// This internal constructor initializes a new collection for tracking asynchronous tasks. - #[allow(clippy::type_complexity)] - fn new(panic_handler: Option) + Send + Sync>>) -> Self { + fn new(panic_handler: Option>) -> Self { Self { futures: FuturesUnordered::new(), panic_handler, @@ -118,12 +117,8 @@ where /// /// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler /// can be provided to manage errors during task execution. - #[allow(clippy::type_complexity)] - pub fn new( - max_concurrency: usize, - rx: S, - panic_handler: Option) + Send + Sync>>, - ) -> Self { + + pub fn new(max_concurrency: usize, rx: S, panic_handler: Option>) -> Self { Self { max_concurrency, rx, diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs index 92dcf5323ae..a0163b19e10 100644 --- a/relay-threading/src/pool.rs +++ b/relay-threading/src/pool.rs @@ -1,4 +1,3 @@ -use std::any::Any; use std::future::Future; use std::io; use std::panic::AssertUnwindSafe; @@ -9,6 +8,7 @@ use futures::FutureExt; use crate::builder::AsyncPoolBuilder; use crate::multiplexing::Multiplexed; +use crate::PanicHandler; /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads. /// @@ -67,11 +67,12 @@ where /// /// # Panics /// - /// This method panics if the internal task channel is unexpectedly closed. + /// This method panics if all receivers have been dropped which can happen when all threads of + /// the pool panicked. pub fn spawn(&self, future: F) { assert!( self.tx.send(future).is_ok(), - "receiver never exits before sender" + "failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)" ); } @@ -81,11 +82,12 @@ where /// /// # Panics /// - /// This method panics if the internal task channel is unexpectedly closed. + /// This method panics if all receivers have been dropped which can happen when all threads of + /// the pool panicked. pub async fn spawn_async(&self, future: F) { assert!( self.tx.send_async(future).await.is_ok(), - "receiver never exits before sender" + "failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)" ); } } @@ -96,8 +98,7 @@ pub struct Thread { max_concurrency: usize, name: Option, runtime: tokio::runtime::Handle, - #[allow(clippy::type_complexity)] - panic_handler: Option) + Send + Sync>>, + panic_handler: Option>, task: BoxFuture<'static, ()>, } @@ -206,6 +207,7 @@ where #[cfg(test)] mod tests { use std::future::Future; + use std::panic::AssertUnwindSafe; use std::sync::atomic::AtomicBool; use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -368,4 +370,19 @@ mod tests { assert!(has_panicked.load(Ordering::SeqCst)); } + + #[tokio::test] + async fn test_spawn_panics_if_no_threads_are_available() { + let pool = AsyncPoolBuilder::new(Handle::current()) + .num_threads(0) + .max_concurrency(1) + .build() + .unwrap(); + + let result = std::panic::catch_unwind(AssertUnwindSafe(|| { + pool.spawn(async move {}); + })); + + assert!(result.is_err()); + } } diff --git a/relay/src/main.rs b/relay/src/main.rs index 0a5b2713923..539fbcaacf6 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -63,6 +63,7 @@ //! - [`relay-statsd`]: High-level StatsD metric client for internal measurements. //! - [`relay-system`]: Foundational system components for Relay's services. //! - [`relay-test`]: Helpers for testing the web server and services. +//! - [`relay-threading`]: Threading code that is used by Relay. //! - [`relay-ua`]: User agent parser with built-in rules. //! //! # Tools