Skip to content

Commit 31f032a

Browse files
authored
feat(threading): Add new relay-threading module (#4500)
1 parent 691f89c commit 31f032a

File tree

10 files changed

+1189
-0
lines changed

10 files changed

+1189
-0
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
**Features**:
6+
7+
- Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500))
8+
39
## 25.2.0
410

511
- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471))

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ relay-server = { path = "relay-server" }
5656
relay-spans = { path = "relay-spans" }
5757
relay-statsd = { path = "relay-statsd" }
5858
relay-system = { path = "relay-system" }
59+
relay-threading = { path = "relay-threading" }
5960
relay-ua = { path = "relay-ua" }
6061
relay-test = { path = "relay-test" }
6162
relay-protocol-derive = { path = "relay-protocol-derive" }
@@ -98,6 +99,7 @@ enumset = "1.0.13"
9899
flate2 = "1.0.35"
99100
fnv = "1.0.7"
100101
futures = { version = "0.3", default-features = false, features = ["std"] }
102+
flume = { version = "0.11.1", default-features = false }
101103
globset = "0.4.15"
102104
hash32 = "0.3.1"
103105
hashbrown = "0.14.5"

relay-threading/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "relay-threading"
3+
authors = ["Sentry <[email protected]>"]
4+
description = "Threading code that is used by Relay"
5+
homepage = "https://getsentry.github.io/relay/"
6+
repository = "https://github.com/getsentry/relay"
7+
version = "25.2.0"
8+
edition = "2021"
9+
license-file = "../LICENSE.md"
10+
publish = false
11+
12+
[dependencies]
13+
flume = { workspace = true }
14+
futures = { workspace = true }
15+
tokio = { workspace = true }
16+
pin-project-lite = { workspace = true }
17+
18+
[dev-dependencies]
19+
criterion = { workspace = true, features = ["async_tokio"] }
20+
futures = { workspace = true, features = ["executor"] }
21+
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync", "macros"] }
22+
23+
[[bench]]
24+
name = "pool"
25+
harness = false

relay-threading/benches/pool.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::future::Future;
2+
use std::sync::atomic::{AtomicUsize, Ordering};
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
7+
use futures::future::{BoxFuture, FutureExt};
8+
use relay_threading::AsyncPoolBuilder;
9+
use tokio::runtime::Runtime;
10+
use tokio::sync::Semaphore;
11+
12+
struct BenchBarrier {
13+
semaphore: Arc<Semaphore>,
14+
count: usize,
15+
}
16+
17+
impl BenchBarrier {
18+
fn new(count: usize) -> Self {
19+
Self {
20+
semaphore: Arc::new(Semaphore::new(count)),
21+
count,
22+
}
23+
}
24+
25+
async fn spawn<F, Fut>(&self, pool: &relay_threading::AsyncPool<BoxFuture<'static, ()>>, f: F)
26+
where
27+
F: FnOnce() -> Fut + Send + 'static,
28+
Fut: Future<Output = ()> + Send + 'static,
29+
{
30+
let semaphore = self.semaphore.clone();
31+
let permit = semaphore.acquire_owned().await.unwrap();
32+
pool.spawn_async(
33+
async move {
34+
f().await;
35+
drop(permit);
36+
}
37+
.boxed(),
38+
)
39+
.await;
40+
}
41+
42+
async fn wait(&self) {
43+
let _ = self
44+
.semaphore
45+
.acquire_many(self.count as u32)
46+
.await
47+
.unwrap();
48+
}
49+
}
50+
51+
fn create_runtime() -> Runtime {
52+
tokio::runtime::Builder::new_multi_thread()
53+
.worker_threads(4)
54+
.enable_all()
55+
.build()
56+
.unwrap()
57+
}
58+
59+
async fn run_benchmark(pool: &relay_threading::AsyncPool<BoxFuture<'static, ()>>, count: usize) {
60+
let counter = Arc::new(AtomicUsize::new(0));
61+
let barrier = BenchBarrier::new(count);
62+
63+
// Spawn tasks
64+
for _ in 0..count {
65+
let counter = counter.clone();
66+
barrier
67+
.spawn(pool, move || async move {
68+
// Simulate some work
69+
tokio::time::sleep(Duration::from_micros(50)).await;
70+
counter.fetch_add(1, Ordering::SeqCst);
71+
})
72+
.await;
73+
}
74+
75+
// Wait for all tasks to complete
76+
barrier.wait().await;
77+
assert_eq!(counter.load(Ordering::SeqCst), count);
78+
}
79+
80+
fn bench_pool_scaling(c: &mut Criterion) {
81+
let runtime = create_runtime();
82+
let mut group = c.benchmark_group("pool_scaling");
83+
group.sampling_mode(criterion::SamplingMode::Flat);
84+
group.measurement_time(Duration::from_secs(10));
85+
86+
// Test with different numbers of threads
87+
for threads in [1, 2, 4, 8].iter() {
88+
let pool = AsyncPoolBuilder::new(runtime.handle().clone())
89+
.num_threads(*threads)
90+
.max_concurrency(100)
91+
.build()
92+
.unwrap();
93+
94+
// Test with different task counts
95+
for tasks in [100, 1000, 10000].iter() {
96+
group.bench_with_input(
97+
BenchmarkId::new(format!("threads_{}", threads), tasks),
98+
tasks,
99+
|b, &tasks| {
100+
b.to_async(&runtime).iter(|| run_benchmark(&pool, tasks));
101+
},
102+
);
103+
}
104+
}
105+
106+
group.finish();
107+
}
108+
109+
fn bench_multi_threaded_spawn(c: &mut Criterion) {
110+
let runtime = create_runtime();
111+
let mut group = c.benchmark_group("multi_threaded_spawn");
112+
group.sampling_mode(criterion::SamplingMode::Flat);
113+
group.measurement_time(Duration::from_secs(10));
114+
115+
// Test with different numbers of spawning threads
116+
for spawn_threads in [2, 4, 8].iter() {
117+
// Test with different task counts
118+
for tasks in [1000, 10000].iter() {
119+
group.bench_with_input(
120+
BenchmarkId::new(format!("spawn_threads_{}", spawn_threads), tasks),
121+
tasks,
122+
|b, &tasks| {
123+
b.to_async(&runtime).iter(|| async {
124+
let pool = Arc::new(
125+
AsyncPoolBuilder::new(runtime.handle().clone())
126+
.num_threads(4) // Fixed number of worker threads
127+
.max_concurrency(100)
128+
.build()
129+
.unwrap(),
130+
);
131+
132+
let tasks_per_thread = tasks / spawn_threads;
133+
let mut handles = Vec::new();
134+
135+
// Spawn tasks from multiple threads
136+
for _ in 0..*spawn_threads {
137+
let runtime = runtime.handle().clone();
138+
let pool = pool.clone();
139+
let handle = std::thread::spawn(move || {
140+
runtime.block_on(run_benchmark(&pool, tasks_per_thread));
141+
});
142+
handles.push(handle);
143+
}
144+
145+
// Wait for all spawning threads to complete
146+
for handle in handles {
147+
handle.join().unwrap();
148+
}
149+
});
150+
},
151+
);
152+
}
153+
}
154+
155+
group.finish();
156+
}
157+
158+
criterion_group!(benches, bench_pool_scaling, bench_multi_threaded_spawn);
159+
criterion_main!(benches);

relay-threading/src/builder.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use std::any::Any;
2+
use std::future::Future;
3+
use std::io;
4+
use std::sync::Arc;
5+
6+
use crate::pool::{AsyncPool, Thread};
7+
use crate::pool::{CustomSpawn, DefaultSpawn, ThreadSpawn};
8+
9+
/// Type alias for a thread safe closure that is used for panic handling across the code.
10+
pub(crate) type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
11+
12+
/// [`AsyncPoolBuilder`] provides a flexible way to configure and build an [`AsyncPool`] for executing
13+
/// asynchronous tasks concurrently on dedicated threads.
14+
///
15+
/// This builder enables you to customize the number of threads, concurrency limits, thread naming,
16+
/// and panic handling strategies.
17+
pub struct AsyncPoolBuilder<S = DefaultSpawn> {
18+
pub(crate) runtime: tokio::runtime::Handle,
19+
pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>,
20+
pub(crate) thread_panic_handler: Option<Arc<PanicHandler>>,
21+
pub(crate) task_panic_handler: Option<Arc<PanicHandler>>,
22+
pub(crate) spawn_handler: S,
23+
pub(crate) num_threads: usize,
24+
pub(crate) max_concurrency: usize,
25+
}
26+
27+
impl AsyncPoolBuilder<DefaultSpawn> {
28+
/// Initializes a new [`AsyncPoolBuilder`] with default settings.
29+
///
30+
/// The builder is tied to the provided [`tokio::runtime::Handle`] and prepares to configure an [`AsyncPool`].
31+
pub fn new(runtime: tokio::runtime::Handle) -> AsyncPoolBuilder<DefaultSpawn> {
32+
AsyncPoolBuilder {
33+
runtime,
34+
thread_name: None,
35+
thread_panic_handler: None,
36+
task_panic_handler: None,
37+
spawn_handler: DefaultSpawn,
38+
num_threads: 1,
39+
max_concurrency: 1,
40+
}
41+
}
42+
}
43+
44+
impl<S> AsyncPoolBuilder<S>
45+
where
46+
S: ThreadSpawn,
47+
{
48+
/// Specifies a custom naming convention for threads in the [`AsyncPool`].
49+
///
50+
/// The provided closure receives the thread's index and returns a name,
51+
/// which can be useful for debugging and logging.
52+
pub fn thread_name<F>(mut self, thread_name: F) -> Self
53+
where
54+
F: FnMut(usize) -> String + 'static,
55+
{
56+
self.thread_name = Some(Box::new(thread_name));
57+
self
58+
}
59+
60+
/// Sets a custom panic handler for threads in the [`AsyncPool`].
61+
///
62+
/// If a thread panics, the provided handler will be invoked so that you can perform
63+
/// custom error handling or cleanup.
64+
pub fn thread_panic_handler<F>(mut self, panic_handler: F) -> Self
65+
where
66+
F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
67+
{
68+
self.thread_panic_handler = Some(Arc::new(panic_handler));
69+
self
70+
}
71+
72+
/// Sets a custom panic handler for tasks executed by the [`AsyncPool`].
73+
///
74+
/// This handler is used to manage panics that occur during task execution, allowing for graceful
75+
/// error handling.
76+
pub fn task_panic_handler<F>(mut self, panic_handler: F) -> Self
77+
where
78+
F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
79+
{
80+
self.task_panic_handler = Some(Arc::new(panic_handler));
81+
self
82+
}
83+
84+
/// Configures a custom thread spawning procedure for the [`AsyncPool`].
85+
///
86+
/// This method allows you to adjust thread settings (e.g. naming, stack size) before thread creation,
87+
/// making it possible to apply application-specific configurations.
88+
pub fn spawn_handler<F>(self, spawn_handler: F) -> AsyncPoolBuilder<CustomSpawn<F>>
89+
where
90+
F: FnMut(Thread) -> io::Result<()>,
91+
{
92+
AsyncPoolBuilder {
93+
runtime: self.runtime,
94+
thread_name: self.thread_name,
95+
thread_panic_handler: self.thread_panic_handler,
96+
task_panic_handler: self.task_panic_handler,
97+
spawn_handler: CustomSpawn::new(spawn_handler),
98+
num_threads: self.num_threads,
99+
max_concurrency: self.max_concurrency,
100+
}
101+
}
102+
103+
/// Sets the number of worker threads for the [`AsyncPool`].
104+
///
105+
/// This determines how many dedicated threads will be available for running tasks concurrently.
106+
pub fn num_threads(mut self, num_threads: usize) -> Self {
107+
self.num_threads = num_threads;
108+
self
109+
}
110+
111+
/// Sets the maximum number of concurrent tasks per thread in the [`AsyncPool`].
112+
///
113+
/// This controls how many futures can be polled simultaneously on each worker thread.
114+
pub fn max_concurrency(mut self, max_concurrency: usize) -> Self {
115+
self.max_concurrency = max_concurrency;
116+
self
117+
}
118+
119+
/// Constructs an [`AsyncPool`] based on the configured settings.
120+
///
121+
/// Finalizing the builder sets up dedicated worker threads and configures the executor
122+
/// to enforce the specified concurrency limits.
123+
pub fn build<F>(self) -> Result<AsyncPool<F>, io::Error>
124+
where
125+
F: Future<Output = ()> + Send + 'static,
126+
{
127+
AsyncPool::new(self)
128+
}
129+
}

0 commit comments

Comments
 (0)