Skip to content

Commit 0431825

Browse files
committed
Update docs
1 parent ab8af51 commit 0431825

File tree

4 files changed

+102
-87
lines changed

4 files changed

+102
-87
lines changed

relay-threading/src/builder.rs

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ use std::sync::Arc;
66
use crate::pool::{AsyncPool, Thread};
77
use crate::pool::{CustomSpawn, DefaultSpawn, ThreadSpawn};
88

9-
/// A builder for constructing an [`AsyncPool`] with custom configurations.
9+
/// [`AsyncPoolBuilder`] provides a flexible way to configure and build an [`AsyncPool`] for executing
10+
/// asynchronous tasks concurrently on dedicated threads.
1011
///
11-
/// Use this builder to fine-tune the performance and threading behavior of your asynchronous
12-
/// task pool.
12+
/// This builder enables you to customize the number of threads, concurrency limits, thread naming,
13+
/// and panic handling strategies.
1314
pub struct AsyncPoolBuilder<S = DefaultSpawn> {
1415
pub(crate) runtime: tokio::runtime::Handle,
1516
pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>,
@@ -23,7 +24,9 @@ pub struct AsyncPoolBuilder<S = DefaultSpawn> {
2324
}
2425

2526
impl AsyncPoolBuilder<DefaultSpawn> {
26-
/// Creates a new [`AsyncPoolBuilder`] with default settings.
27+
/// Initializes a new [`AsyncPoolBuilder`] with default settings.
28+
///
29+
/// The builder is tied to the provided [`tokio::runtime::Handle`] and prepares to configure an [`AsyncPool`].
2730
pub fn new(runtime: tokio::runtime::Handle) -> AsyncPoolBuilder<DefaultSpawn> {
2831
AsyncPoolBuilder {
2932
runtime,
@@ -41,8 +44,10 @@ impl<S> AsyncPoolBuilder<S>
4144
where
4245
S: ThreadSpawn,
4346
{
44-
/// Specifies a custom hook to generate the thread name given the thread index within the
45-
/// [`AsyncPool`].
47+
/// Specifies a custom naming convention for threads in the [`AsyncPool`].
48+
///
49+
/// The provided closure receives the thread's index and returns a name,
50+
/// which can be useful for debugging and logging.
4651
pub fn thread_name<F>(mut self, thread_name: F) -> Self
4752
where
4853
F: FnMut(usize) -> String + 'static,
@@ -51,11 +56,14 @@ where
5156
self
5257
}
5358

54-
/// Specifies a custom hook to handle the panic happening in one of the threads of the
55-
/// [`AsyncPool`].
59+
/// Sets a custom panic handler for threads in the [`AsyncPool`].
60+
///
61+
/// If a thread panics, the provided handler will be invoked so that you can perform
62+
/// custom error handling or cleanup.
5663
///
57-
/// The `panic_handler` is called once for each panicking thread with the error caught in the
58-
/// panicking as argument.
64+
/// # Panics
65+
///
66+
/// In the absence of this handler, thread panics will propagate.
5967
pub fn thread_panic_handler<F>(mut self, panic_handler: F) -> Self
6068
where
6169
F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
@@ -64,11 +72,14 @@ where
6472
self
6573
}
6674

67-
/// Specifies a custom hook to handle the panic happening in one of the tasks of the
68-
/// [`AsyncPool`].
75+
/// Sets a custom panic handler for tasks executed by the [`AsyncPool`].
76+
///
77+
/// This handler is used to manage panics that occur during task execution, allowing for graceful
78+
/// error handling.
6979
///
70-
/// The `panic_handler` is called once for each panicking task with the error caught in the
71-
/// panicking as argument.
80+
/// # Panics
81+
///
82+
/// Without a handler, panics in tasks will propagate.
7283
pub fn task_panic_handler<F>(mut self, panic_handler: F) -> Self
7384
where
7485
F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
@@ -77,7 +88,10 @@ where
7788
self
7889
}
7990

80-
/// Specifies a custom hook to dynamically adjust thread settings for the [`AsyncPool`].
91+
/// Configures a custom thread spawning procedure for the [`AsyncPool`].
92+
///
93+
/// This method allows you to adjust thread settings (e.g. naming, stack size) before thread creation,
94+
/// making it possible to apply application-specific configurations.
8195
pub fn spawn_handler<F>(self, spawn_handler: F) -> AsyncPoolBuilder<CustomSpawn<F>>
8296
where
8397
F: FnMut(Thread) -> io::Result<()>,
@@ -93,21 +107,26 @@ where
93107
}
94108
}
95109

96-
/// Sets the number of executor threads for running tasks in the [`AsyncPool`].
110+
/// Sets the number of worker threads for the [`AsyncPool`].
111+
///
112+
/// This determines how many dedicated threads will be available for running tasks concurrently.
97113
pub fn num_threads(mut self, num_threads: usize) -> Self {
98114
self.num_threads = num_threads;
99115
self
100116
}
101117

102-
/// Adjusts the maximum number of concurrent tasks allowed per executor in the [`AsyncPool`].
118+
/// Sets the maximum number of concurrent tasks per thread in the [`AsyncPool`].
103119
///
104-
/// The max concurrency determines how many futures can be polled simultaneously.
120+
/// This controls how many futures can be polled simultaneously on each worker thread.
105121
pub fn max_concurrency(mut self, max_concurrency: usize) -> Self {
106122
self.max_concurrency = max_concurrency;
107123
self
108124
}
109125

110-
/// Finalizes the configuration and constructs an operational [`AsyncPool`] for executing tasks.
126+
/// Constructs an [`AsyncPool`] based on the configured settings.
127+
///
128+
/// Finalizing the builder sets up dedicated worker threads and configures the executor
129+
/// to enforce the specified concurrency limits.
111130
pub fn build<F>(self) -> Result<AsyncPool<F>, io::Error>
112131
where
113132
F: Future<Output = ()> + Send + 'static,

relay-threading/src/lib.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,6 @@
2222
//! backpressure - when workers are overwhelmed, task submission will block until capacity becomes
2323
//! available, preventing resource exhaustion.
2424
//!
25-
//! ## Modules
26-
//!
27-
//! - **builder**: Contains the [`AsyncPoolBuilder`] which configures and constructs an [`AsyncPool`].
28-
//! - **multiplexing**: Provides the [`Multiplexed`] future that manages and executes multiple asynchronous tasks concurrently.
29-
//! - **pool**: Implements the [`AsyncPool`] and related types, such as [`Thread`]. Together they encapsulate the logic
30-
//! for scheduling tasks across multiple threads.
31-
//!
3225
//! ## Usage Example
3326
//!
3427
//! ```rust
@@ -55,11 +48,6 @@
5548
//!
5649
//! Both the async pool and its task multiplexer support custom panic handlers, allowing graceful
5750
//! recovery from panics in either thread execution or individual tasks.
58-
//!
59-
//! For more details on specific functionalities, refer to the documentation in the submodules:
60-
//! - [`builder`]
61-
//! - [`multiplexing`]
62-
//! - [`pool`]
6351
6452
mod builder;
6553
mod multiplexing;

relay-threading/src/multiplexing.rs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use pin_project_lite::pin_project;
1212
use tokio::task::Unconstrained;
1313

1414
pin_project! {
15-
/// Manages a collection of asynchronous tasks that are executed concurrently.
15+
/// Manages concurrent execution of asynchronous tasks.
1616
///
17-
/// This helper is designed for use within the [`Multiplexed`] executor to schedule and drive tasks
18-
/// while respecting a set concurrency limit.
17+
/// This internal structure collects and drives futures concurrently, invoking a panic handler (if provided)
18+
/// when a task encounters a panic.
1919
struct Tasks<F> {
2020
#[pin]
2121
futures: FuturesUnordered<Unconstrained<CatchUnwind<AssertUnwindSafe<F>>>>,
@@ -24,9 +24,9 @@ pin_project! {
2424
}
2525

2626
impl<F> Tasks<F> {
27-
/// Initializes a new [`Tasks`] collection for managing asynchronous tasks.
27+
/// Creates a new task manager.
2828
///
29-
/// This collection is used internally by [`Multiplexed`] to schedule task execution.
29+
/// This internal constructor initializes a new collection for tracking asynchronous tasks.
3030
#[allow(clippy::type_complexity)]
3131
fn new(panic_handler: Option<Arc<dyn Fn(Box<dyn Any + Send>) + Send + Sync>>) -> Self {
3232
Self {
@@ -50,21 +50,16 @@ impl<F> Tasks<F>
5050
where
5151
F: Future<Output = ()>,
5252
{
53-
/// Adds a new asynchronous task to the collection.
54-
///
55-
/// Use this method to submit additional tasks for concurrent execution.
53+
/// Adds a future to the collection for concurrent execution.
5654
fn push(&mut self, future: F) {
5755
let future = AssertUnwindSafe(future).catch_unwind();
5856
self.futures.push(tokio::task::unconstrained(future));
5957
}
6058

61-
/// Drives the scheduled tasks until one remains pending.
62-
///
63-
/// This method advances the execution of managed tasks until it encounters a task
64-
/// that is not immediately ready, ensuring that completed tasks are processed and the
65-
/// multiplexer can schedule new ones.
59+
/// Drives the execution of collected tasks until a pending state is encountered.
6660
///
67-
/// For any task that panics, the `panic_handler` callback will be called.
61+
/// If a future panics and a panic handler is provided, the handler is invoked.
62+
/// Otherwise, the panic is propagated.
6863
fn poll_tasks_until_pending(self: Pin<&mut Self>, cx: &mut Context<'_>) {
6964
let mut this = self.project();
7065

@@ -98,12 +93,14 @@ where
9893
}
9994

10095
pin_project! {
101-
/// A task multiplexer that concurrently executes asynchronous tasks while limiting the maximum
102-
/// number of tasks running simultaneously.
96+
/// [`Multiplexed`] is a future that concurrently schedules asynchronous tasks from a stream while ensuring that
97+
/// the number of concurrently executing tasks does not exceed a specified limit.
98+
///
99+
/// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads.
103100
///
104-
/// The [`Multiplexed`] executor retrieves tasks from a stream and schedules them according to the
105-
/// specified concurrency limit. The multiplexer completes once all tasks have been executed and
106-
/// no further tasks are available.
101+
/// # Panics
102+
///
103+
/// If any task panics without a custom panic handler, the panic will propagate.
107104
pub struct Multiplexed<S, F> {
108105
max_concurrency: usize,
109106
#[pin]
@@ -112,14 +109,15 @@ pin_project! {
112109
tasks: Tasks<F>,
113110
}
114111
}
112+
115113
impl<S, F> Multiplexed<S, F>
116114
where
117115
S: Stream<Item = F>,
118116
{
119-
/// Constructs a new [`Multiplexed`] executor with a concurrency limit and a stream of tasks.
117+
/// Creates a new [`Multiplexed`] instance with a defined concurrency limit and a stream of tasks.
120118
///
121-
/// This multiplexer should be awaited until completion, at which point all submitted tasks
122-
/// will have been executed.
119+
/// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler
120+
/// can be provided to manage errors during task execution.
123121
#[allow(clippy::type_complexity)]
124122
pub fn new(
125123
max_concurrency: usize,
@@ -141,12 +139,10 @@ where
141139
{
142140
type Output = ();
143141

144-
/// Drives the execution of the multiplexer by advancing scheduled tasks and fetching new ones.
142+
/// Polls the [`Multiplexed`] future to drive task execution.
145143
///
146-
/// This method polls the collection of tasks and retrieves additional tasks from the stream until
147-
/// the concurrency limit is reached or no more tasks are available. It yields `Poll::Pending` when
148-
/// there is ongoing work and returns `Poll::Ready(())` only once the task stream is exhausted
149-
/// and no active tasks remain.
144+
/// This method repeatedly schedules new tasks from the stream while enforcing the concurrency limit.
145+
/// It completes when the stream is exhausted and no active tasks remain.
150146
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151147
let mut this = self.project();
152148

0 commit comments

Comments
 (0)