Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,17 +320,14 @@ impl TraceExporterBuilder {
let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));

// On native, `C::new_client()` may capture `tokio::runtime::Handle::current()`
// internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context
// so that handle is available. On wasm this is a no-op — the JS event loop is
// always the implicit executor.
#[cfg(not(target_arch = "wasm32"))]
let _guard = shared_runtime
.runtime_handle()
// internally (e.g. `NativeCapabilities`). Run it inside the SharedRuntime's
// tokio context so that handle is available. On wasm this is a no-op — the
// JS event loop is always the implicit executor.
let capabilities = shared_runtime
.with_runtime_context(C::new_client)
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})?
.enter();
let capabilities = C::new_client();
})?;

// --- Platform-specific worker setup ---
// The blocks below spawn background workers via `SharedRuntime`. On
Expand Down
24 changes: 17 additions & 7 deletions libdd-shared-runtime-ffi/src/shared_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,33 @@ use std::ptr::NonNull;
use std::sync::Arc;

/// Error codes for SharedRuntime FFI operations.
///
/// # ABI stability
/// Discriminants are pinned explicitly. The numeric values are part of the
/// C ABI: existing variants must never be renumbered or reused, and new
/// variants must be appended with a fresh value. Inserting a variant in
/// the middle of the list silently misclassifies errors on any caller
/// compiled against an older header.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to the error codes is not necessary, users of libdatadog through the C API have no expectation of ABI backward comaptibility

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error codes removed

#[repr(C)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum SharedRuntimeErrorCode {
/// Invalid argument provided (e.g. null handle).
InvalidArgument,
InvalidArgument = 0,
/// The runtime is not available or in an invalid state.
RuntimeUnavailable,
RuntimeUnavailable = 1,
/// Failed to acquire a lock on internal state.
LockFailed,
LockFailed = 2,
/// A worker operation failed.
WorkerError,
WorkerError = 3,
/// Failed to create the tokio runtime.
RuntimeCreation,
RuntimeCreation = 4,
/// Shutdown timed out.
ShutdownTimedOut,
ShutdownTimedOut = 5,
/// An unexpected panic occurred inside the FFI call.
#[cfg(feature = "catch_panic")]
Panic,
Panic = 6,
/// Operation rejected because the runtime has already been shut down.
AlreadyShutdown = 7,
}

/// Error returned by SharedRuntime FFI functions.
Expand All @@ -50,6 +59,7 @@ impl From<SharedRuntimeError> for SharedRuntimeFFIError {
fn from(err: SharedRuntimeError) -> Self {
let code = match &err {
SharedRuntimeError::RuntimeUnavailable => SharedRuntimeErrorCode::RuntimeUnavailable,
SharedRuntimeError::AlreadyShutdown => SharedRuntimeErrorCode::AlreadyShutdown,
SharedRuntimeError::LockFailed(_) => SharedRuntimeErrorCode::LockFailed,
SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError,
SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation,
Expand Down
154 changes: 143 additions & 11 deletions libdd-shared-runtime/src/shared_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::worker::Worker;
use futures::stream::{FuturesUnordered, StreamExt};
use libdd_common::MutexExt;
use pausable_worker::{PausableWorker, PausableWorkerError};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::{fmt, io};
use tracing::{debug, error};
Expand All @@ -26,7 +26,6 @@ use tracing::{debug, error};
mod native {
use super::*;
use pausable_worker::tokio_spawn_fn;
use std::sync::atomic::Ordering;
use tokio::runtime::{Builder, Runtime};

fn build_runtime() -> Result<Runtime, io::Error> {
Expand All @@ -42,21 +41,39 @@ mod native {
runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))),
workers: Arc::new(Mutex::new(Vec::new())),
next_worker_id: AtomicU64::new(1),
shutdown: AtomicBool::new(false),
})
}

/// Returns a clone of the tokio runtime handle managed by this SharedRuntime.
/// Run `f` with the shared tokio runtime entered as the current context.
///
/// Useful for synchronous initialization that calls into
/// [`tokio::runtime::Handle::current()`] (e.g., constructing an HTTP
/// client that captures the current handle internally).
///
/// # Fork safety
/// Tasks spawned via `tokio::spawn` / `Handle::current().spawn(...)`
/// inside `f` are NOT tracked by `SharedRuntime`: they will not be
/// paused before fork, restarted after fork, or shut down by
/// [`Self::shutdown`]. For background work, register a
/// [`crate::Worker`] via [`Self::spawn_worker`] instead.
///
/// # Errors
/// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down.
pub fn runtime_handle(&self) -> Result<tokio::runtime::Handle, SharedRuntimeError> {
Ok(self
/// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime
/// has been shut down or is in a fork window.
pub fn with_runtime_context<F, T>(&self, f: F) -> Result<T, SharedRuntimeError>
where
Comment on lines -51 to +65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is any safer than before 🤔
Maybe a tiny bit nicer because it moves the wasm cfg to the runtime

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runtime_handle() handed callers an owned, long-lived capability that lets them bypass every safety guarantee. An spawned task from that handle is not in the workers list, is not paused by before_fork, etc so could it lead to confusion?

F: FnOnce() -> T,
{
let handle = self
.runtime
.lock_or_panic()
.as_ref()
.ok_or(SharedRuntimeError::RuntimeUnavailable)?
.handle()
.clone())
.clone();
let _guard = handle.enter();
Ok(f())
}

/// Spawn a PausableWorker on this runtime.
Expand Down Expand Up @@ -89,6 +106,14 @@ mod native {
let runtime_guard = self.runtime.lock_or_panic();
let mut workers_guard = self.workers.lock_or_panic();

// Reject post-shutdown spawns under the workers lock — this is the
// same lock `shutdown_async` acquires before draining, so once
// shutdown wins the workers lock, every subsequent spawn observes
// the flag and bails instead of silently registering a dead worker.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and further comments introduced by this PR) are way too verbose for something fairly obvious

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

if self.shutdown.load(Ordering::Acquire) {
return Err(SharedRuntimeError::AlreadyShutdown);
}

if let Some(rt) = runtime_guard.as_ref() {
if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) {
return Err(e.into());
Expand Down Expand Up @@ -165,8 +190,17 @@ mod native {

let mut workers_lock = self.workers.lock_or_panic();

// Log-and-continue: a single worker in `InvalidState` (e.g. its
// previous task was aborted) must not abort the whole restart
// loop and leave every other component dead. This matches the
// failure-tolerance pattern already used by `before_fork`.
for worker_entry in workers_lock.iter_mut() {
worker_entry.worker.start(tokio_spawn_fn(&handle))?;
if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) {
error!(
worker_id = worker_entry.id,
"Worker failed to restart after fork in parent: {:?}", e
);
}
}

Ok(())
Expand Down Expand Up @@ -196,9 +230,17 @@ mod native {

workers_lock.retain(|entry| entry.restart_on_fork);

// Log-and-continue: see `after_fork_parent`. In the child this
// matters even more — a single InvalidState worker must not
// silence every other component for the lifetime of the process.
for worker_entry in workers_lock.iter_mut() {
worker_entry.worker.reset();
worker_entry.worker.start(tokio_spawn_fn(&handle))?;
if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) {
error!(
worker_id = worker_entry.id,
"Worker failed to restart after fork in child: {:?}", e
);
}
}

Ok(())
Expand Down Expand Up @@ -234,6 +276,7 @@ mod native {
timeout: Option<std::time::Duration>,
) -> Result<(), SharedRuntimeError> {
debug!(?timeout, "Shutting down SharedRuntime");
self.shutdown.store(true, Ordering::Release);
match self.runtime.lock_or_panic().take() {
Some(runtime) => {
if let Some(timeout) = timeout {
Expand Down Expand Up @@ -337,6 +380,8 @@ impl WorkerHandle {
pub enum SharedRuntimeError {
/// The runtime is not available or in an invalid state.
RuntimeUnavailable,
/// Operation rejected because the runtime has already been shut down.
AlreadyShutdown,
/// Failed to acquire a lock on internal state.
LockFailed(String),
/// A worker operation failed.
Expand All @@ -353,6 +398,7 @@ impl fmt::Display for SharedRuntimeError {
Self::RuntimeUnavailable => {
write!(f, "Runtime is not available or in an invalid state")
}
Self::AlreadyShutdown => write!(f, "Runtime has already been shut down"),
Self::LockFailed(msg) => write!(f, "Failed to acquire lock: {}", msg),
Self::WorkerError(err) => write!(f, "Worker error: {}", err),
Self::RuntimeCreation(err) => {
Expand Down Expand Up @@ -397,6 +443,10 @@ pub struct SharedRuntime {
runtime: Arc<Mutex<Option<Arc<tokio::runtime::Runtime>>>>,
workers: Arc<Mutex<Vec<WorkerEntry>>>,
next_worker_id: AtomicU64,
/// Set once `shutdown` / `shutdown_async` is called. After this point
/// `spawn_worker` rejects with `AlreadyShutdown` instead of silently
/// registering a worker that will never run.
shutdown: AtomicBool,
}

impl SharedRuntime {
Expand All @@ -419,6 +469,7 @@ impl SharedRuntime {
Ok(Self {
workers: Arc::new(Mutex::new(Vec::new())),
next_worker_id: AtomicU64::new(1),
shutdown: AtomicBool::new(false),
})
}
}
Expand All @@ -430,14 +481,16 @@ impl SharedRuntime {
worker: T,
restart_on_fork: bool,
) -> Result<WorkerHandle, SharedRuntimeError> {
use std::sync::atomic::Ordering;

let boxed_worker: BoxedWorker = Box::new(worker);
debug!(?boxed_worker, "Spawning worker on SharedRuntime");
let mut pausable_worker = PausableWorker::new(boxed_worker);

let mut workers_guard = self.workers.lock_or_panic();

if self.shutdown.load(Ordering::Acquire) {
return Err(SharedRuntimeError::AlreadyShutdown);
}

if let Err(e) = pausable_worker.start(|future| {
use futures_util::FutureExt;
let (remote, handle) = future.remote_handle();
Expand All @@ -461,6 +514,17 @@ impl SharedRuntime {
})
}

/// On wasm32, [`Self::with_runtime_context`] is a no-op — the JS event
/// loop is the implicit executor, so there is no tokio context to enter.
/// The closure is invoked unchanged so callers can be platform-agnostic.
#[cfg(target_arch = "wasm32")]
pub fn with_runtime_context<F, T>(&self, f: F) -> Result<T, SharedRuntimeError>
where
F: FnOnce() -> T,
{
Ok(f())
}

/// Shutdown all workers asynchronously.
///
/// This should be called during application shutdown to cleanly stop all
Expand All @@ -469,6 +533,7 @@ impl SharedRuntime {
/// Worker errors are logged but do not cause the function to fail.
pub async fn shutdown_async(&self) {
debug!("Shutting down all workers asynchronously");
self.shutdown.store(true, Ordering::Release);
let workers = {
let mut workers_lock = self.workers.lock_or_panic();
std::mem::take(&mut *workers_lock)
Expand Down Expand Up @@ -696,4 +761,71 @@ mod tests {
"worker should not run or shut down after fork in child when restart_on_fork is false"
);
}

/// A single `PausableWorker` in `InvalidState` must
/// not abort the whole restart loop in `after_fork_parent` /
/// `after_fork_child`. The bad entry is logged and skipped; every
/// other worker still resumes after fork.
#[test]
fn after_fork_parent_skips_invalid_state_workers() {
let shared_runtime = SharedRuntime::new().unwrap();

let (good, good_rx) = make_test_worker();
let _ = shared_runtime.spawn_worker(good, true).unwrap();

// Second worker — we'll corrupt its entry into InvalidState below,
// simulating a previously-aborted task.
let (bad, _bad_rx) = make_test_worker();
let _ = shared_runtime.spawn_worker(bad, true).unwrap();

good_rx
.recv_timeout(Duration::from_secs(1))
.expect("good worker did not run before fork");

{
let mut workers = shared_runtime.workers.lock_or_panic();
workers[1].worker = PausableWorker::InvalidState;
}

shared_runtime.before_fork();
while good_rx.try_recv().is_ok() {}

let result = shared_runtime.after_fork_parent();

assert!(
result.is_ok(),
"after_fork_parent should not bail on a single InvalidState worker"
);
assert!(
good_rx.recv_timeout(Duration::from_secs(1)).is_ok(),
"good worker should resume after fork even if a peer is InvalidState"
);
}

/// `spawn_worker` after `shutdown` must reject with
/// `AlreadyShutdown` rather than silently registering a worker that
/// will never run. The shutdown state is observed under the workers
/// lock so the same guarantee holds against the during-shutdown race.
#[test]
fn spawn_worker_after_shutdown_should_be_rejected() {
let shared_runtime = SharedRuntime::new().unwrap();
shared_runtime.shutdown(None).unwrap();

let (worker, rx) = make_test_worker();
let result = shared_runtime.spawn_worker(worker, true);

assert!(
matches!(result, Err(SharedRuntimeError::AlreadyShutdown)),
"spawn_worker after shutdown should return AlreadyShutdown, got {result:?}"
);
assert_eq!(
shared_runtime.workers.lock_or_panic().len(),
0,
"no dead worker should be registered"
);
assert!(
rx.recv_timeout(Duration::from_millis(200)).is_err(),
"no worker should be running after shutdown"
);
}
}
Loading