Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prof): avoid dlclose if threads did not join #3075

Merged
merged 3 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ serde_json = {version = "1.0"}
rand = { version = "0.8.5" }
rand_distr = { version = "0.4.3" }
rustc-hash = "1.1.0"
thiserror = "2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
Expand Down
21 changes: 15 additions & 6 deletions profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod exception;
mod timeline;

use crate::config::{SystemSettings, INITIAL_SYSTEM_SETTINGS};
use crate::zend::datadog_sapi_globals_request_info;
use bindings::{
self as zend, ddog_php_prof_php_version, ddog_php_prof_php_version_id, ZendExtension,
ZendResult,
Expand All @@ -46,8 +47,6 @@ use std::sync::{Arc, Once};
use std::time::{Duration, Instant};
use uuid::Uuid;

use crate::zend::datadog_sapi_globals_request_info;

/// Name of the profiling module and zend_extension. Must not contain any
/// interior null bytes and must be null terminated.
static PROFILER_NAME: &[u8] = b"datadog-profiling\0";
Expand Down Expand Up @@ -889,11 +888,21 @@ extern "C" fn startup(extension: *mut ZendExtension) -> ZendResult {
ZendResult::Success
}

extern "C" fn shutdown(_extension: *mut ZendExtension) {
extern "C" fn shutdown(extension: *mut ZendExtension) {
#[cfg(debug_assertions)]
trace!("shutdown({:p})", _extension);

Profiler::shutdown(Duration::from_secs(2));
trace!("shutdown({:p})", extension);

// If a timeout was reached, then the thread is possibly alive.
// This means the engine cannot unload our handle, or else we'd hit
// immediate undefined behavior (and likely crash).
if let Err(err) = Profiler::shutdown(Duration::from_secs(2)) {
let num_failures = err.num_failures;
error!("{num_failures} thread(s) failed to join, intentionally leaking the extension's handle to prevent unloading");
// SAFETY: during mshutdown, we have ownership of the extension struct.
// Our threads (which failed to join) do not mutate this struct at all
// either, providing no races.
unsafe { (*extension).handle = ptr::null_mut() }
}

// SAFETY: calling in shutdown before zai config is shutdown, and after
// all configuration is done being accessed. Well... in the happy-path,
Expand Down
1 change: 1 addition & 0 deletions profiling/src/php_ffi.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ void datadog_php_profiling_copy_string_view_into_zval(zval *dest, zai_str view,
#ifdef CFG_TEST
(void)dest;
(void)view;
(void)persistent;
ZEND_ASSERT(0);
#else
if (view.len == 0) {
Expand Down
40 changes: 25 additions & 15 deletions profiling/src/profiling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use crate::bindings::{datadog_php_profiling_get_profiling_context, zend_execute_
use crate::config::SystemSettings;
use crate::{CLOCKS, TAGS};
use chrono::Utc;
#[cfg(feature = "timeline")]
use core::{ptr, str};
use crossbeam_channel::{Receiver, Sender, TrySendError};
use datadog_profiling::api::{
Function, Label as ApiLabel, Location, Period, Sample, ValueType as ApiValueType,
Expand All @@ -40,6 +38,8 @@ use std::sync::{Arc, Barrier};
use std::thread::JoinHandle;
use std::time::{Duration, Instant, SystemTime};

#[cfg(feature = "timeline")]
use core::{ptr, str};
#[cfg(feature = "timeline")]
use std::time::UNIX_EPOCH;

Expand Down Expand Up @@ -689,33 +689,39 @@ impl Profiler {
/// Completes the shutdown process; to start it, call [Profiler::stop]
/// before calling [Profiler::shutdown].
/// Note the timeout is per thread, and there may be multiple threads.
/// Returns Ok(true) if any thread hit a timeout.
///
/// Safety: only safe to be called in `SHUTDOWN`/`MSHUTDOWN` phase
pub fn shutdown(timeout: Duration) {
pub fn shutdown(timeout: Duration) -> Result<(), JoinError> {
// SAFETY: the `take` access is a thread-safe API, and the PROFILER is
// not being mutated outside single-threaded phases such as minit and
// mshutdown.
if let Some(profiler) = unsafe { PROFILER.take() } {
profiler.join_collector_and_uploader(timeout);
profiler.join_collector_and_uploader(timeout)
} else {
Ok(())
}
}

pub fn join_collector_and_uploader(self, timeout: Duration) {
fn join_collector_and_uploader(self, timeout: Duration) -> Result<(), JoinError> {
if self.should_join.load(Ordering::SeqCst) {
thread_utils::join_timeout(
self.time_collector_handle,
timeout,
"Recent samples may be lost.",
);
let result1 = thread_utils::join_timeout(self.time_collector_handle, timeout);
if let Err(err) = &result1 {
warn!("{err}, recent samples may be lost");
}

// Wait for the time_collector to join, since that will drop
// the sender half of the channel that the uploader is
// holding, allowing it to finish.
thread_utils::join_timeout(
self.uploader_handle,
timeout,
"Recent samples are most likely lost.",
);
let result2 = thread_utils::join_timeout(self.uploader_handle, timeout);
if let Err(err) = &result2 {
warn!("{err}, recent samples are most likely lost");
}

let num_failures = result1.is_err() as usize + result2.is_err() as usize;
result2.or(result1).map_err(|_| JoinError { num_failures })
} else {
Ok(())
}
}

Expand Down Expand Up @@ -1288,6 +1294,10 @@ impl Profiler {
}
}

pub struct JoinError {
pub num_failures: usize,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
32 changes: 20 additions & 12 deletions profiling/src/profiling/thread_utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#[cfg(php_zts)]
use crate::sapi::Sapi;
use crate::SAPI;
#[cfg(php_zts)]
use libc::c_char;
use libc::sched_yield;
use log::warn;
use once_cell::sync::OnceCell;
use std::mem::MaybeUninit;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

#[cfg(php_zts)]
use crate::sapi::Sapi;
#[cfg(php_zts)]
use libc::c_char;

/// Spawns a thread and masks off the signals that the Zend Engine uses.
pub fn spawn<F, T>(name: &str, f: F) -> JoinHandle<T>
where
Expand Down Expand Up @@ -50,11 +50,18 @@ where
}
}

#[derive(thiserror::Error, Debug)]
#[error("timeout of {timeout_ms} ms reached when joining thread {thread}")]
pub struct TimeoutError {
thread: String,
timeout_ms: u128,
}

/// Waits for the handle to be finished. If finished, it will join the handle.
/// Otherwise, it will leak the handle.
/// Otherwise, it will leak the handle and return an error.
/// # Panics
/// Panics if the thread being joined has panic'd.
pub fn join_timeout(handle: JoinHandle<()>, timeout: Duration, impact: &str) {
/// If the thread being joined has panic'd, this will resume the panic.
pub fn join_timeout(handle: JoinHandle<()>, timeout: Duration) -> Result<(), TimeoutError> {
// After notifying the other threads, it's likely they'll need some time
// to respond adequately. Joining on the JoinHandle is supposed to be the
// correct way to do this, but we've observed this can panic:
Expand All @@ -65,15 +72,16 @@ pub fn join_timeout(handle: JoinHandle<()>, timeout: Duration, impact: &str) {
while !handle.is_finished() {
unsafe { sched_yield() };
if start.elapsed() >= timeout {
let name = handle.thread().name().unwrap_or("{unknown}");
warn!("Timeout of {timeout:?} reached when joining thread '{name}'. {impact}");
return;
let thread = handle.thread().name().unwrap_or("{unknown}").to_string();
let timeout_ms = timeout.as_millis();
return Err(TimeoutError { thread, timeout_ms });
}
}

if let Err(err) = handle.join() {
std::panic::resume_unwind(err)
std::panic::resume_unwind(err);
}
Ok(())
}

thread_local! {
Expand Down
Loading