Skip to content

Commit 1e0908e

Browse files
iunanuaclaude
andcommitted
fix(shared-runtime): fork recovery, shutdown, runtime_handle
Three issues: - after_fork_parent / after_fork_child used `?` inside the restart loop, so a single InvalidState worker silenced every other component in the fork child. Now log-and-continue, matching before_fork. - spawn_worker after shutdown silently registered a dead worker: there was no shutdown state distinct from "no runtime yet". Add `shutdown: AtomicBool`, set it in shutdown / shutdown_async, and check it under the workers lock in spawn_worker so the during-shutdown race is also covered. New SharedRuntimeError::AlreadyShutdown + FFI mapping. - runtime_handle() handed callers an owned tokio Handle that bypassed every fork-safety guarantee the crate exists to provide. Replace with with_runtime_context(closure), which scopes the entered runtime to a synchronous closure; migrate the lone production caller in the trace exporter builder. Regression tests for the first two are un-ignored and assert the correct behavior. The spawn_worker/before_fork stress test for the previously-fixed TOCTOU stays #[ignore]'d for regression hunting. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0a3304c commit 1e0908e

3 files changed

Lines changed: 152 additions & 20 deletions

File tree

libdd-data-pipeline/src/trace_exporter/builder.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -320,17 +320,14 @@ impl TraceExporterBuilder {
320320
let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));
321321

322322
// On native, `C::new_client()` may capture `tokio::runtime::Handle::current()`
323-
// internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context
324-
// so that handle is available. On wasm this is a no-op — the JS event loop is
325-
// always the implicit executor.
326-
#[cfg(not(target_arch = "wasm32"))]
327-
let _guard = shared_runtime
328-
.runtime_handle()
323+
// internally (e.g. `NativeCapabilities`). Run it inside the SharedRuntime's
324+
// tokio context so that handle is available. On wasm this is a no-op — the
325+
// JS event loop is always the implicit executor.
326+
let capabilities = shared_runtime
327+
.with_runtime_context(C::new_client)
329328
.map_err(|e| {
330329
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
331-
})?
332-
.enter();
333-
let capabilities = C::new_client();
330+
})?;
334331

335332
// --- Platform-specific worker setup ---
336333
// The blocks below spawn background workers via `SharedRuntime`. On

libdd-shared-runtime-ffi/src/shared_runtime.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ pub enum SharedRuntimeErrorCode {
1515
InvalidArgument,
1616
/// The runtime is not available or in an invalid state.
1717
RuntimeUnavailable,
18+
/// Operation rejected because the runtime has already been shut down.
19+
AlreadyShutdown,
1820
/// Failed to acquire a lock on internal state.
1921
LockFailed,
2022
/// A worker operation failed.
@@ -50,6 +52,7 @@ impl From<SharedRuntimeError> for SharedRuntimeFFIError {
5052
fn from(err: SharedRuntimeError) -> Self {
5153
let code = match &err {
5254
SharedRuntimeError::RuntimeUnavailable => SharedRuntimeErrorCode::RuntimeUnavailable,
55+
SharedRuntimeError::AlreadyShutdown => SharedRuntimeErrorCode::AlreadyShutdown,
5356
SharedRuntimeError::LockFailed(_) => SharedRuntimeErrorCode::LockFailed,
5457
SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError,
5558
SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation,

libdd-shared-runtime/src/shared_runtime/mod.rs

Lines changed: 143 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::worker::Worker;
1414
use futures::stream::{FuturesUnordered, StreamExt};
1515
use libdd_common::MutexExt;
1616
use pausable_worker::{PausableWorker, PausableWorkerError};
17-
use std::sync::atomic::AtomicU64;
17+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1818
use std::sync::{Arc, Mutex};
1919
use std::{fmt, io};
2020
use tracing::{debug, error};
@@ -26,7 +26,6 @@ use tracing::{debug, error};
2626
mod native {
2727
use super::*;
2828
use pausable_worker::tokio_spawn_fn;
29-
use std::sync::atomic::Ordering;
3029
use tokio::runtime::{Builder, Runtime};
3130

3231
fn build_runtime() -> Result<Runtime, io::Error> {
@@ -42,21 +41,39 @@ mod native {
4241
runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))),
4342
workers: Arc::new(Mutex::new(Vec::new())),
4443
next_worker_id: AtomicU64::new(1),
44+
shutdown: AtomicBool::new(false),
4545
})
4646
}
4747

48-
/// Returns a clone of the tokio runtime handle managed by this SharedRuntime.
48+
/// Run `f` with the shared tokio runtime entered as the current context.
49+
///
50+
/// Useful for synchronous initialization that calls into
51+
/// [`tokio::runtime::Handle::current()`] (e.g., constructing an HTTP
52+
/// client that captures the current handle internally).
53+
///
54+
/// # Fork safety
55+
/// Tasks spawned via `tokio::spawn` / `Handle::current().spawn(...)`
56+
/// inside `f` are NOT tracked by `SharedRuntime`: they will not be
57+
/// paused before fork, restarted after fork, or shut down by
58+
/// [`Self::shutdown`]. For background work, register a
59+
/// [`crate::Worker`] via [`Self::spawn_worker`] instead.
4960
///
5061
/// # Errors
51-
/// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down.
52-
pub fn runtime_handle(&self) -> Result<tokio::runtime::Handle, SharedRuntimeError> {
53-
Ok(self
62+
/// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime
63+
/// has been shut down or is in a fork window.
64+
pub fn with_runtime_context<F, T>(&self, f: F) -> Result<T, SharedRuntimeError>
65+
where
66+
F: FnOnce() -> T,
67+
{
68+
let handle = self
5469
.runtime
5570
.lock_or_panic()
5671
.as_ref()
5772
.ok_or(SharedRuntimeError::RuntimeUnavailable)?
5873
.handle()
59-
.clone())
74+
.clone();
75+
let _guard = handle.enter();
76+
Ok(f())
6077
}
6178

6279
/// Spawn a PausableWorker on this runtime.
@@ -89,6 +106,14 @@ mod native {
89106
let runtime_guard = self.runtime.lock_or_panic();
90107
let mut workers_guard = self.workers.lock_or_panic();
91108

109+
// Reject post-shutdown spawns under the workers lock — this is the
110+
// same lock `shutdown_async` acquires before draining, so once
111+
// shutdown wins the workers lock, every subsequent spawn observes
112+
// the flag and bails instead of silently registering a dead worker.
113+
if self.shutdown.load(Ordering::Acquire) {
114+
return Err(SharedRuntimeError::AlreadyShutdown);
115+
}
116+
92117
if let Some(rt) = runtime_guard.as_ref() {
93118
if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) {
94119
return Err(e.into());
@@ -165,8 +190,17 @@ mod native {
165190

166191
let mut workers_lock = self.workers.lock_or_panic();
167192

193+
// Log-and-continue: a single worker in `InvalidState` (e.g. its
194+
// previous task was aborted) must not abort the whole restart
195+
// loop and leave every other component dead. This matches the
196+
// failure-tolerance pattern already used by `before_fork`.
168197
for worker_entry in workers_lock.iter_mut() {
169-
worker_entry.worker.start(tokio_spawn_fn(&handle))?;
198+
if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) {
199+
error!(
200+
worker_id = worker_entry.id,
201+
"Worker failed to restart after fork in parent: {:?}", e
202+
);
203+
}
170204
}
171205

172206
Ok(())
@@ -196,9 +230,17 @@ mod native {
196230

197231
workers_lock.retain(|entry| entry.restart_on_fork);
198232

233+
// Log-and-continue: see `after_fork_parent`. In the child this
234+
// matters even more — a single InvalidState worker must not
235+
// silence every other component for the lifetime of the process.
199236
for worker_entry in workers_lock.iter_mut() {
200237
worker_entry.worker.reset();
201-
worker_entry.worker.start(tokio_spawn_fn(&handle))?;
238+
if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) {
239+
error!(
240+
worker_id = worker_entry.id,
241+
"Worker failed to restart after fork in child: {:?}", e
242+
);
243+
}
202244
}
203245

204246
Ok(())
@@ -234,6 +276,7 @@ mod native {
234276
timeout: Option<std::time::Duration>,
235277
) -> Result<(), SharedRuntimeError> {
236278
debug!(?timeout, "Shutting down SharedRuntime");
279+
self.shutdown.store(true, Ordering::Release);
237280
match self.runtime.lock_or_panic().take() {
238281
Some(runtime) => {
239282
if let Some(timeout) = timeout {
@@ -337,6 +380,8 @@ impl WorkerHandle {
337380
pub enum SharedRuntimeError {
338381
/// The runtime is not available or in an invalid state.
339382
RuntimeUnavailable,
383+
/// Operation rejected because the runtime has already been shut down.
384+
AlreadyShutdown,
340385
/// Failed to acquire a lock on internal state.
341386
LockFailed(String),
342387
/// A worker operation failed.
@@ -353,6 +398,7 @@ impl fmt::Display for SharedRuntimeError {
353398
Self::RuntimeUnavailable => {
354399
write!(f, "Runtime is not available or in an invalid state")
355400
}
401+
Self::AlreadyShutdown => write!(f, "Runtime has already been shut down"),
356402
Self::LockFailed(msg) => write!(f, "Failed to acquire lock: {}", msg),
357403
Self::WorkerError(err) => write!(f, "Worker error: {}", err),
358404
Self::RuntimeCreation(err) => {
@@ -397,6 +443,10 @@ pub struct SharedRuntime {
397443
runtime: Arc<Mutex<Option<Arc<tokio::runtime::Runtime>>>>,
398444
workers: Arc<Mutex<Vec<WorkerEntry>>>,
399445
next_worker_id: AtomicU64,
446+
/// Set once `shutdown` / `shutdown_async` is called. After this point
447+
/// `spawn_worker` rejects with `AlreadyShutdown` instead of silently
448+
/// registering a worker that will never run.
449+
shutdown: AtomicBool,
400450
}
401451

402452
impl SharedRuntime {
@@ -419,6 +469,7 @@ impl SharedRuntime {
419469
Ok(Self {
420470
workers: Arc::new(Mutex::new(Vec::new())),
421471
next_worker_id: AtomicU64::new(1),
472+
shutdown: AtomicBool::new(false),
422473
})
423474
}
424475
}
@@ -430,14 +481,16 @@ impl SharedRuntime {
430481
worker: T,
431482
restart_on_fork: bool,
432483
) -> Result<WorkerHandle, SharedRuntimeError> {
433-
use std::sync::atomic::Ordering;
434-
435484
let boxed_worker: BoxedWorker = Box::new(worker);
436485
debug!(?boxed_worker, "Spawning worker on SharedRuntime");
437486
let mut pausable_worker = PausableWorker::new(boxed_worker);
438487

439488
let mut workers_guard = self.workers.lock_or_panic();
440489

490+
if self.shutdown.load(Ordering::Acquire) {
491+
return Err(SharedRuntimeError::AlreadyShutdown);
492+
}
493+
441494
if let Err(e) = pausable_worker.start(|future| {
442495
use futures_util::FutureExt;
443496
let (remote, handle) = future.remote_handle();
@@ -461,6 +514,17 @@ impl SharedRuntime {
461514
})
462515
}
463516

517+
/// On wasm32, [`Self::with_runtime_context`] is a no-op — the JS event
518+
/// loop is the implicit executor, so there is no tokio context to enter.
519+
/// The closure is invoked unchanged so callers can be platform-agnostic.
520+
#[cfg(target_arch = "wasm32")]
521+
pub fn with_runtime_context<F, T>(&self, f: F) -> Result<T, SharedRuntimeError>
522+
where
523+
F: FnOnce() -> T,
524+
{
525+
Ok(f())
526+
}
527+
464528
/// Shutdown all workers asynchronously.
465529
///
466530
/// This should be called during application shutdown to cleanly stop all
@@ -469,6 +533,7 @@ impl SharedRuntime {
469533
/// Worker errors are logged but do not cause the function to fail.
470534
pub async fn shutdown_async(&self) {
471535
debug!("Shutting down all workers asynchronously");
536+
self.shutdown.store(true, Ordering::Release);
472537
let workers = {
473538
let mut workers_lock = self.workers.lock_or_panic();
474539
std::mem::take(&mut *workers_lock)
@@ -696,4 +761,71 @@ mod tests {
696761
"worker should not run or shut down after fork in child when restart_on_fork is false"
697762
);
698763
}
764+
765+
/// A single `PausableWorker` in `InvalidState` must
766+
/// not abort the whole restart loop in `after_fork_parent` /
767+
/// `after_fork_child`. The bad entry is logged and skipped; every
768+
/// other worker still resumes after fork.
769+
#[test]
770+
fn after_fork_parent_skips_invalid_state_workers() {
771+
let shared_runtime = SharedRuntime::new().unwrap();
772+
773+
let (good, good_rx) = make_test_worker();
774+
let _ = shared_runtime.spawn_worker(good, true).unwrap();
775+
776+
// Second worker — we'll corrupt its entry into InvalidState below,
777+
// simulating a previously-aborted task.
778+
let (bad, _bad_rx) = make_test_worker();
779+
let _ = shared_runtime.spawn_worker(bad, true).unwrap();
780+
781+
good_rx
782+
.recv_timeout(Duration::from_secs(1))
783+
.expect("good worker did not run before fork");
784+
785+
{
786+
let mut workers = shared_runtime.workers.lock_or_panic();
787+
workers[1].worker = PausableWorker::InvalidState;
788+
}
789+
790+
shared_runtime.before_fork();
791+
while good_rx.try_recv().is_ok() {}
792+
793+
let result = shared_runtime.after_fork_parent();
794+
795+
assert!(
796+
result.is_ok(),
797+
"after_fork_parent should not bail on a single InvalidState worker"
798+
);
799+
assert!(
800+
good_rx.recv_timeout(Duration::from_secs(1)).is_ok(),
801+
"good worker should resume after fork even if a peer is InvalidState"
802+
);
803+
}
804+
805+
/// `spawn_worker` after `shutdown` must reject with
806+
/// `AlreadyShutdown` rather than silently registering a worker that
807+
/// will never run. The shutdown state is observed under the workers
808+
/// lock so the same guarantee holds against the during-shutdown race.
809+
#[test]
810+
fn spawn_worker_after_shutdown_should_be_rejected() {
811+
let shared_runtime = SharedRuntime::new().unwrap();
812+
shared_runtime.shutdown(None).unwrap();
813+
814+
let (worker, rx) = make_test_worker();
815+
let result = shared_runtime.spawn_worker(worker, true);
816+
817+
assert!(
818+
matches!(result, Err(SharedRuntimeError::AlreadyShutdown)),
819+
"spawn_worker after shutdown should return AlreadyShutdown, got {result:?}"
820+
);
821+
assert_eq!(
822+
shared_runtime.workers.lock_or_panic().len(),
823+
0,
824+
"no dead worker should be registered"
825+
);
826+
assert!(
827+
rx.recv_timeout(Duration::from_millis(200)).is_err(),
828+
"no worker should be running after shutdown"
829+
);
830+
}
699831
}

0 commit comments

Comments
 (0)