From c3ce1ce4dd366e222c9b02ec7c706ec5058d0d50 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 01/13] Heap profiler with leak tracking --- quickwit/Cargo.lock | 4 + quickwit/Cargo.toml | 1 + quickwit/quickwit-cli/Cargo.toml | 8 + quickwit/quickwit-cli/src/jemalloc.rs | 4 + quickwit/quickwit-common/Cargo.toml | 9 + quickwit/quickwit-common/src/alloc_tracker.rs | 232 ++++++++++++++++++ .../quickwit-common/src/jemalloc_profiled.rs | 202 +++++++++++++++ quickwit/quickwit-common/src/lib.rs | 4 + quickwit/quickwit-serve/Cargo.toml | 3 + .../src/developer_api/heap_prof.rs | 53 ++++ .../src/developer_api/heap_prof_disabled.rs | 29 +++ .../quickwit-serve/src/developer_api/mod.rs | 9 +- 12 files changed, 555 insertions(+), 3 deletions(-) create mode 100644 quickwit/quickwit-common/src/alloc_tracker.rs create mode 100644 quickwit/quickwit-common/src/jemalloc_profiled.rs create mode 100644 quickwit/quickwit-serve/src/developer_api/heap_prof.rs create mode 100644 quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 51a8f28ad15..b9158f299d8 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6866,6 +6866,7 @@ dependencies = [ "anyhow", "async-speed-limit", "async-trait", + "backtrace", "bytesize", "coarsetime", "dyn-clone", @@ -6887,10 +6888,13 @@ dependencies = [ "regex", "serde", "serde_json", + "serial_test", "siphasher 0.3.11", "sysinfo", "tempfile", "thiserror 1.0.69", + "tikv-jemalloc-ctl", + "tikv-jemallocator", "tokio", "tokio-metrics", "tokio-stream", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 1f687c96d72..460dfb06756 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -85,6 +85,7 @@ assert-json-diff = "2" async-compression = { version = "0.4", features = ["tokio", "gzip"] } async-speed-limit = "0.4" async-trait = "0.1" +backtrace = "0.3" base64 = "0.22" binggan = { version = "0.14" } bytes = { version = "1", features = ["serde"] } diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 41a8fdce5e0..524e77fc8fd 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -81,6 +81,10 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } [features] jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"] +jemalloc-profiled = [ + "quickwit-common/jemalloc-profiled", + "quickwit-serve/jemalloc-profiled" +] ci-test = [] pprof = ["quickwit-serve/pprof"] openssl-support = ["openssl-probe"] @@ -127,6 +131,10 @@ release-macos-feature-vendored-set = [ "quickwit-metastore/postgres", "quickwit-doc-mapper/multilang", ] +release-heap-profiled = [ + "release-feature-set", + "jemalloc-profiled" +] [package.metadata.cargo-machete] # used to enable the `multilang` feature diff --git a/quickwit/quickwit-cli/src/jemalloc.rs b/quickwit/quickwit-cli/src/jemalloc.rs index 71969f79909..e5223e5ee31 100644 --- a/quickwit/quickwit-cli/src/jemalloc.rs +++ b/quickwit/quickwit-cli/src/jemalloc.rs @@ -19,6 +19,10 @@ use tikv_jemallocator::Jemalloc; use tracing::error; #[global_allocator] +#[cfg(feature = "jemalloc-profiled")] +pub static GLOBAL: quickwit_common::jemalloc_profiled::JemallocProfiled = + quickwit_common::jemalloc_profiled::JemallocProfiled(Jemalloc); +#[cfg(not(feature = "jemalloc-profiled"))] pub static GLOBAL: Jemalloc = Jemalloc; const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1); diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 3abcd35f3d5..5f0e7f1375b 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true anyhow = { workspace = true } async-speed-limit = { workspace = true } async-trait = { workspace = true } +backtrace = { workspace = true, optional = true } bytesize = { workspace = true } coarsetime = { workspace = true } dyn-clone = { workspace = true } @@ -37,6 +38,8 @@ siphasher = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } +tikv-jemallocator = { workspace = true, optional = true } +tikv-jemalloc-ctl = { workspace = true, optional = true } tokio = { workspace = true } tokio-metrics = { workspace = true } tokio-stream = { workspace = true } @@ -47,9 +50,15 @@ tracing = { workspace = true } [features] testsuite = [] named_tasks = ["tokio/tracing"] +jemalloc-profiled = [ + "dep:backtrace", + "dep:tikv-jemallocator", + "dep:tikv-jemalloc-ctl" +] [dev-dependencies] serde_json = { workspace = true } tempfile = { workspace = true } proptest = { workspace = true } +serial_test = { workspace = true } tokio = { workspace = true, features = ["test-util"] } diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs new file mode 100644 index 00000000000..7c2bf3ae483 --- /dev/null +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -0,0 +1,232 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +const DEFAULT_REPORTING_INTERVAL: u64 = 1024 * 1024 * 1024; + +static ALLOCATION_TRACKER: Lazy> = + Lazy::new(|| Mutex::new(Allocations::default())); + +#[derive(Debug)] +struct Allocation { + pub callsite_hash: u64, + pub size: u64, +} + +#[derive(Debug, Copy, Clone)] +pub struct Statistic { + pub count: u64, + pub size: u64, + pub last_print: u64, +} + +/// WARN: +/// - keys and values in these maps should not allocate! +/// - we assume HashMaps don't allocate if their capacity is not exceeded +#[derive(Debug)] +struct Allocations { + memory_locations: HashMap, + callsite_statistics: HashMap, + is_started: bool, + reporting_interval: u64, +} + +impl Default for Allocations { + fn default() -> Self { + Self { + memory_locations: HashMap::with_capacity(128 * 1024), + callsite_statistics: HashMap::with_capacity(32 * 1024), + is_started: false, + reporting_interval: DEFAULT_REPORTING_INTERVAL, + } + } +} + +// pub fn log_dump() { +// tracing::info!(allocations=?ALLOCATION_TRACKER.lock().unwrap(), "dump"); +// } + +pub fn init(alloc_size_triggering_backtrace: Option) { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + guard.memory_locations.clear(); + guard.callsite_statistics.clear(); + guard.is_started = true; + guard.reporting_interval = + alloc_size_triggering_backtrace.unwrap_or(DEFAULT_REPORTING_INTERVAL); +} + +pub enum AllocRecordingResponse { + ThresholdExceeded(Statistic), + ThresholdNotExceeded, + TrackerFull(&'static str), + NotStarted, +} + +/// Records an allocation and occasionally reports the cumulated allocation size +/// for the provided callsite_hash. +/// +/// Every time a the total allocated size with the same callsite_hash +/// exceeds the previous reported value by at least reporting_interval, that +/// allocated size is reported. +/// +/// WARN: this function should not allocate! +pub fn record_allocation(callsite_hash: u64, size: u64, ptr: *mut u8) -> AllocRecordingResponse { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + if !guard.is_started { + return AllocRecordingResponse::NotStarted; + } + if guard.memory_locations.capacity() == guard.memory_locations.len() { + return AllocRecordingResponse::TrackerFull("memory_locations"); + } + if guard.callsite_statistics.capacity() == guard.callsite_statistics.len() { + return AllocRecordingResponse::TrackerFull("memory_locations"); + } + guard.memory_locations.insert( + ptr as usize, + Allocation { + callsite_hash, + size, + }, + ); + let reporting_interval = guard.reporting_interval; + let entry = guard + .callsite_statistics + .entry(callsite_hash) + .and_modify(|stat| { + stat.count += 1; + stat.size += size; + }) + .or_insert(Statistic { + count: 1, + size, + last_print: 0, + }); + let new_threshold_exceeded = entry.size > (entry.last_print + reporting_interval); + if new_threshold_exceeded { + let reported_statistic = *entry; + entry.last_print = entry.size; + AllocRecordingResponse::ThresholdExceeded(reported_statistic) + } else { + AllocRecordingResponse::ThresholdNotExceeded + } +} + +/// WARN: this function should not allocate! +pub fn record_deallocation(ptr: *mut u8) { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + if !guard.is_started { + return; + } + let Some(Allocation { + size, + callsite_hash, + .. + }) = guard.memory_locations.remove(&(ptr as usize)) + else { + // this was allocated before the tracking started + return; + }; + if let Entry::Occupied(mut content) = guard.callsite_statistics.entry(callsite_hash) { + content.get_mut().count -= 1; + content.get_mut().size -= size; + if content.get().count == 0 { + content.remove(); + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + #[serial_test::file_serial] + fn test_record_allocation_and_deallocation() { + init(Some(2000)); + let callsite_hash_1 = 777; + + let ptr_1 = 0x1 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_1); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + let ptr_2 = 0x2 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_2); + let AllocRecordingResponse::ThresholdExceeded(statistic) = response else { + panic!("Expected ThresholdExceeded response"); + }; + assert_eq!(statistic.count, 2); + assert_eq!(statistic.size, 3000); + assert_eq!(statistic.last_print, 0); + + record_deallocation(ptr_2); + + // the threshold was already crossed + let ptr_3 = 0x3 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_3); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + // this is a brand new call site with different statistics + let callsite_hash_2 = 42; + let ptr_3 = 0x3 as *mut u8; + let response = record_allocation(callsite_hash_2, 1500, ptr_3); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + } + + #[test] + #[serial_test::file_serial] + fn test_tracker_full() { + init(Some(1024 * 1024 * 1024)); + let memory_locations_capacity = ALLOCATION_TRACKER + .lock() + .unwrap() + .memory_locations + .capacity(); + + for i in 0..memory_locations_capacity { + let ptr = (i + 1) as *mut u8; + let response = record_allocation(777, 10, ptr); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + } + let response = record_allocation(777, 10, (memory_locations_capacity + 1) as *mut u8); + assert!(matches!( + response, + AllocRecordingResponse::TrackerFull("memory_locations") + )); + // make sure that the map didn't grow + let current_memory_locations_capacity = ALLOCATION_TRACKER + .lock() + .unwrap() + .memory_locations + .capacity(); + assert_eq!(current_memory_locations_capacity, memory_locations_capacity); + } +} diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs new file mode 100644 index 00000000000..179234f09b5 --- /dev/null +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -0,0 +1,202 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::{GlobalAlloc, Layout}; +use std::hash::Hasher; +use std::io::Write; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +use tikv_jemallocator::Jemalloc; +use tracing::{error, info}; + +use crate::alloc_tracker::{self, AllocRecordingResponse}; + +const DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING: usize = 256 * 1024; + +// Atomics are used to communicate configurations between the start/stop +// endpoints and the JemallocProfiled allocator wrapper. + +/// The minimum allocation size that is recorded by the tracker. +static MIN_ALLOC_SIZE_FOR_PROFILING: AtomicUsize = + AtomicUsize::new(DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING); + +/// Whether the profiling is started or not. +static ENABLED: AtomicBool = AtomicBool::new(false); + +/// Starts measuring heap allocations and logs important leaks. +/// +/// This function uses a wrapper around the global Jemalloc allocator to +/// instrument it. Each time an allocation bigger than +/// min_alloc_size_for_profiling is performed, it is recorded in a map and the +/// statistics for its call site are updated. +/// +/// During profiling, the statistics per call site are used to log when specific +/// thresholds are exceeded. For each call site, the allocated memory is +/// logged (with a backtrace) every time it exceeds the last logged allocated +/// memory by at least alloc_size_triggering_backtrace. +pub fn start_profiling( + min_alloc_size_for_profiling: Option, + alloc_size_triggering_backtrace: Option, +) { + // Call backtrace once to warmup symbolization allocations (~30MB) + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |_| {}); + true + }); + + alloc_tracker::init(alloc_size_triggering_backtrace); + + let min_alloc_size_for_profiling = + min_alloc_size_for_profiling.unwrap_or(DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING); + // Use strong ordering to make sure all threads see these changes in this order + MIN_ALLOC_SIZE_FOR_PROFILING.store(min_alloc_size_for_profiling, Ordering::SeqCst); + let previously_enabled = ENABLED.swap(true, Ordering::SeqCst); + + info!( + min_alloc_size_for_profiling, + alloc_size_triggering_backtrace, previously_enabled, "heap profiling running" + ); +} + +/// Stops measuring heap allocations. +/// +/// The allocation tracking tables and the symbol cache are not cleared. +pub fn stop_profiling() { + // Use strong ordering to make sure all threads see these changes in this order + let previously_enabled = ENABLED.swap(false, Ordering::SeqCst); + MIN_ALLOC_SIZE_FOR_PROFILING.store(DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING, Ordering::SeqCst); + + info!(previously_enabled, "heap profiling stopped"); + // alloc_tracker::log_dump(); + // backtrace::clear_symbol_cache(); +} + +/// Wraps the Jemalloc global allocator calls with tracking routines. +/// +/// The tracking routines are called only when [ENABLED] is set to true (calling +/// [start_profiling()]), but we don't enforce any synchronization (we load it with +/// Ordering::Relaxed) because it's fine to miss or record extra allocation events. +pub struct JemallocProfiled(pub Jemalloc); + +unsafe impl GlobalAlloc for JemallocProfiled { + #[inline] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let ptr = unsafe { self.0.alloc(layout) }; + if ENABLED.load(Ordering::Relaxed) { + track_alloc_call(ptr, layout); + } + ptr + } + + #[inline] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + let ptr = unsafe { self.0.alloc_zeroed(layout) }; + if ENABLED.load(Ordering::Relaxed) { + track_alloc_call(ptr, layout); + } + ptr + } + + #[inline] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + if ENABLED.load(Ordering::Relaxed) { + track_dealloc_call(ptr, layout); + } + unsafe { self.0.dealloc(ptr, layout) } + } + + #[inline] + unsafe fn realloc(&self, old_ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let new_ptr = unsafe { self.0.realloc(old_ptr, layout, new_size) }; + if ENABLED.load(Ordering::Relaxed) { + track_realloc_call(old_ptr, new_ptr, layout, new_size); + } + new_ptr + } +} + +#[inline] +fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { + { + let mut lock = std::io::stdout().lock(); + let _ = writeln!( + &mut lock, + "htrk callsite={} allocs={} size={}MiB", + callsite_hash, + stat.count, + stat.size / 1024 / 1024 + ); + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |symbol| { + if let Some(symbole_name) = symbol.name() { + let _ = writeln!(&mut lock, "{}", symbole_name); + } else { + let _ = writeln!(&mut lock, "symb failed"); + } + }); + true + }); + } +} + +#[inline] +fn backtrace_hash() -> u64 { + let mut hasher = fnv::FnvHasher::default(); + backtrace::trace(|frame| { + hasher.write_usize(frame.ip() as usize); + true + }); + hasher.finish() +} + +#[cold] +fn track_alloc_call(ptr: *mut u8, layout: Layout) { + if layout.size() > MIN_ALLOC_SIZE_FOR_PROFILING.load(Ordering::Relaxed) { + let callsite_hash = backtrace_hash(); + let recording_response = + alloc_tracker::record_allocation(callsite_hash, layout.size() as u64, ptr); + + match recording_response { + AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { + print_backtrace(callsite_hash, stat_for_trace); + // Could we use tracing to caracterize the call site here? + // tracing::info!(size = alloc_size_for_trace, "large alloc"); + } + AllocRecordingResponse::TrackerFull(reason) => { + // this message might be displayed multiple times but that's fine + error!("{reason} full, profiling stopped"); + ENABLED.store(false, Ordering::Relaxed); + } + AllocRecordingResponse::ThresholdNotExceeded => {} + AllocRecordingResponse::NotStarted => {} + } + } +} + +#[cold] +fn track_dealloc_call(ptr: *mut u8, layout: Layout) { + if layout.size() > MIN_ALLOC_SIZE_FOR_PROFILING.load(Ordering::Relaxed) { + alloc_tracker::record_deallocation(ptr); + } +} + +#[cold] +fn track_realloc_call( + _old_ptr: *mut u8, + _new_pointer: *mut u8, + _current_layout: Layout, + _new_size: usize, +) { + // TODO handle realloc +} diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 12987898b0f..2b9fa51474d 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -16,9 +16,13 @@ mod coolid; +#[cfg(feature = "jemalloc-profiled")] +pub(crate) mod alloc_tracker; pub mod binary_heap; pub mod fs; pub mod io; +#[cfg(feature = "jemalloc-profiled")] +pub mod jemalloc_profiled; mod kill_switch; pub mod metrics; pub mod net; diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 88c3e4278b4..1c66ccc0e31 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -106,6 +106,9 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } pprof = [ "dep:pprof" ] +jemalloc-profiled = [ + "quickwit-common/jemalloc-profiled" +] testsuite = [] sqs-for-tests = [ "quickwit-indexing/sqs", diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs new file mode 100644 index 00000000000..deff7311932 --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs @@ -0,0 +1,53 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_common::jemalloc_profiled::{start_profiling, stop_profiling}; +use serde::Deserialize; +use warp::Filter; +use warp::reply::Reply; + +pub fn heap_prof_handlers() +-> impl Filter + Clone { + #[derive(Deserialize)] + struct ProfilerQueryParams { + min_alloc_size: Option, + backtrace_every: Option, + } + + let start_profiler = { + warp::path!("heap-prof" / "start") + .and(warp::query::()) + .and_then(move |params: ProfilerQueryParams| start_profiler_handler(params)) + }; + + let stop_profiler = { warp::path!("heap-prof" / "stop").and_then(stop_profiler_handler) }; + + async fn start_profiler_handler( + params: ProfilerQueryParams, + ) -> Result, warp::Rejection> { + start_profiling(params.min_alloc_size, params.backtrace_every); + let resp = warp::reply::with_status("Heap profiling started", warp::http::StatusCode::OK) + .into_response(); + Ok(resp) + } + + async fn stop_profiler_handler() -> Result, warp::Rejection> { + stop_profiling(); + let resp = warp::reply::with_status("Heap profiling stopped", warp::http::StatusCode::OK) + .into_response(); + Ok(resp) + } + + start_profiler.or(stop_profiler) +} diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs new file mode 100644 index 00000000000..a71f724ae0d --- /dev/null +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof_disabled.rs @@ -0,0 +1,29 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use warp::Filter; + +fn not_implemented_handler() -> impl warp::Reply { + warp::reply::with_status( + "Quickwit was compiled without the `jemalloc-profiled` feature", + warp::http::StatusCode::NOT_IMPLEMENTED, + ) +} + +pub fn heap_prof_handlers() +-> impl Filter + Clone { + let start_profiler = { warp::path!("heap-prof" / "start").map(not_implemented_handler) }; + let stop_profiler = { warp::path!("heap-prof" / "stop").map(not_implemented_handler) }; + start_profiler.or(stop_profiler) +} diff --git a/quickwit/quickwit-serve/src/developer_api/mod.rs b/quickwit/quickwit-serve/src/developer_api/mod.rs index 4163db9c933..c7722d3a581 100644 --- a/quickwit/quickwit-serve/src/developer_api/mod.rs +++ b/quickwit/quickwit-serve/src/developer_api/mod.rs @@ -13,14 +13,16 @@ // limitations under the License. mod debug; -mod log_level; +#[cfg_attr(not(feature = "jemalloc-profiled"), path = "heap_prof_disabled.rs")] +mod heap_prof; +mod log_level; #[cfg_attr(not(feature = "pprof"), path = "pprof_disabled.rs")] mod pprof; - mod server; use debug::debug_handler; +use heap_prof::heap_prof_handlers; use log_level::log_level_handler; use pprof::pprof_handlers; use quickwit_cluster::Cluster; @@ -42,7 +44,8 @@ pub(crate) fn developer_api_routes( .and( debug_handler(cluster.clone()) .or(log_level_handler(env_filter_reload_fn.clone()).boxed()) - .or(pprof_handlers()), + .or(pprof_handlers()) + .or(heap_prof_handlers()), ) .recover(recover_fn) } From ffacbd03b1cc4e2a73c82cfe01c178894ab9664d Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 02/13] Improve wording and types --- quickwit/quickwit-common/src/alloc_tracker.rs | 72 ++++++++++--------- .../quickwit-common/src/jemalloc_profiled.rs | 66 +++++++++-------- .../src/developer_api/heap_prof.rs | 2 +- 3 files changed, 75 insertions(+), 65 deletions(-) diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs index 7c2bf3ae483..655e5962e7d 100644 --- a/quickwit/quickwit-common/src/alloc_tracker.rs +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -16,24 +16,29 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::Mutex; +use bytesize::ByteSize; use once_cell::sync::Lazy; -const DEFAULT_REPORTING_INTERVAL: u64 = 1024 * 1024 * 1024; - static ALLOCATION_TRACKER: Lazy> = Lazy::new(|| Mutex::new(Allocations::default())); #[derive(Debug)] struct Allocation { pub callsite_hash: u64, - pub size: u64, + pub size: ByteSize, } #[derive(Debug, Copy, Clone)] pub struct Statistic { pub count: u64, - pub size: u64, - pub last_print: u64, + pub size: ByteSize, + pub last_report: ByteSize, +} + +#[derive(Debug)] +enum Status { + Started { reporting_interval: ByteSize }, + Stopped, } /// WARN: @@ -43,8 +48,7 @@ pub struct Statistic { struct Allocations { memory_locations: HashMap, callsite_statistics: HashMap, - is_started: bool, - reporting_interval: u64, + status: Status, } impl Default for Allocations { @@ -52,23 +56,18 @@ impl Default for Allocations { Self { memory_locations: HashMap::with_capacity(128 * 1024), callsite_statistics: HashMap::with_capacity(32 * 1024), - is_started: false, - reporting_interval: DEFAULT_REPORTING_INTERVAL, + status: Status::Stopped, } } } -// pub fn log_dump() { -// tracing::info!(allocations=?ALLOCATION_TRACKER.lock().unwrap(), "dump"); -// } - -pub fn init(alloc_size_triggering_backtrace: Option) { +pub fn init(reporting_interval_bytes: u64) { let mut guard = ALLOCATION_TRACKER.lock().unwrap(); guard.memory_locations.clear(); guard.callsite_statistics.clear(); - guard.is_started = true; - guard.reporting_interval = - alloc_size_triggering_backtrace.unwrap_or(DEFAULT_REPORTING_INTERVAL); + guard.status = Status::Started { + reporting_interval: ByteSize(reporting_interval_bytes), + } } pub enum AllocRecordingResponse { @@ -86,11 +85,15 @@ pub enum AllocRecordingResponse { /// allocated size is reported. /// /// WARN: this function should not allocate! -pub fn record_allocation(callsite_hash: u64, size: u64, ptr: *mut u8) -> AllocRecordingResponse { +pub fn record_allocation( + callsite_hash: u64, + size_bytes: u64, + ptr: *mut u8, +) -> AllocRecordingResponse { let mut guard = ALLOCATION_TRACKER.lock().unwrap(); - if !guard.is_started { + let Status::Started { reporting_interval } = guard.status else { return AllocRecordingResponse::NotStarted; - } + }; if guard.memory_locations.capacity() == guard.memory_locations.len() { return AllocRecordingResponse::TrackerFull("memory_locations"); } @@ -101,26 +104,25 @@ pub fn record_allocation(callsite_hash: u64, size: u64, ptr: *mut u8) -> AllocRe ptr as usize, Allocation { callsite_hash, - size, + size: ByteSize(size_bytes), }, ); - let reporting_interval = guard.reporting_interval; let entry = guard .callsite_statistics .entry(callsite_hash) .and_modify(|stat| { stat.count += 1; - stat.size += size; + stat.size += size_bytes; }) .or_insert(Statistic { count: 1, - size, - last_print: 0, + size: ByteSize(size_bytes), + last_report: ByteSize(0), }); - let new_threshold_exceeded = entry.size > (entry.last_print + reporting_interval); + let new_threshold_exceeded = entry.size > (entry.last_report + reporting_interval); if new_threshold_exceeded { let reported_statistic = *entry; - entry.last_print = entry.size; + entry.last_report = entry.size; AllocRecordingResponse::ThresholdExceeded(reported_statistic) } else { AllocRecordingResponse::ThresholdNotExceeded @@ -130,7 +132,7 @@ pub fn record_allocation(callsite_hash: u64, size: u64, ptr: *mut u8) -> AllocRe /// WARN: this function should not allocate! pub fn record_deallocation(ptr: *mut u8) { let mut guard = ALLOCATION_TRACKER.lock().unwrap(); - if !guard.is_started { + if let Status::Stopped = guard.status { return; } let Some(Allocation { @@ -143,8 +145,10 @@ pub fn record_deallocation(ptr: *mut u8) { return; }; if let Entry::Occupied(mut content) = guard.callsite_statistics.entry(callsite_hash) { - content.get_mut().count -= 1; - content.get_mut().size -= size; + let new_size_bytes = content.get().size.0.saturating_sub(size.0); + let new_count = content.get().count.saturating_sub(1); + content.get_mut().count = new_count; + content.get_mut().size = ByteSize(new_size_bytes); if content.get().count == 0 { content.remove(); } @@ -159,7 +163,7 @@ mod tests { #[test] #[serial_test::file_serial] fn test_record_allocation_and_deallocation() { - init(Some(2000)); + init(2000); let callsite_hash_1 = 777; let ptr_1 = 0x1 as *mut u8; @@ -175,8 +179,8 @@ mod tests { panic!("Expected ThresholdExceeded response"); }; assert_eq!(statistic.count, 2); - assert_eq!(statistic.size, 3000); - assert_eq!(statistic.last_print, 0); + assert_eq!(statistic.size, ByteSize(3000)); + assert_eq!(statistic.last_report, ByteSize(0)); record_deallocation(ptr_2); @@ -201,7 +205,7 @@ mod tests { #[test] #[serial_test::file_serial] fn test_tracker_full() { - init(Some(1024 * 1024 * 1024)); + init(1024 * 1024 * 1024); let memory_locations_capacity = ALLOCATION_TRACKER .lock() .unwrap() diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 179234f09b5..090dcd42d95 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -15,21 +15,23 @@ use std::alloc::{GlobalAlloc, Layout}; use std::hash::Hasher; use std::io::Write; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use bytesize::ByteSize; use tikv_jemallocator::Jemalloc; use tracing::{error, info}; use crate::alloc_tracker::{self, AllocRecordingResponse}; -const DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING: usize = 256 * 1024; +const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 256 * 1024; +const DEFAULT_REPORTING_INTERVAL_BYTES: u64 = 1024 * 1024 * 1024; // Atomics are used to communicate configurations between the start/stop // endpoints and the JemallocProfiled allocator wrapper. /// The minimum allocation size that is recorded by the tracker. -static MIN_ALLOC_SIZE_FOR_PROFILING: AtomicUsize = - AtomicUsize::new(DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING); +static MIN_ALLOC_BYTES_FOR_PROFILING: AtomicU64 = + AtomicU64::new(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING); /// Whether the profiling is started or not. static ENABLED: AtomicBool = AtomicBool::new(false); @@ -38,16 +40,16 @@ static ENABLED: AtomicBool = AtomicBool::new(false); /// /// This function uses a wrapper around the global Jemalloc allocator to /// instrument it. Each time an allocation bigger than -/// min_alloc_size_for_profiling is performed, it is recorded in a map and the -/// statistics for its call site are updated. +/// min_alloc_bytes_for_profiling is performed, it is recorded in a map and +/// the statistics for its call site are updated. /// /// During profiling, the statistics per call site are used to log when specific -/// thresholds are exceeded. For each call site, the allocated memory is -/// logged (with a backtrace) every time it exceeds the last logged allocated -/// memory by at least alloc_size_triggering_backtrace. +/// thresholds are exceeded. For each call site, the allocated memory is logged +/// (with a backtrace) every time it exceeds the last logged allocated memory by +/// at least alloc_bytes_triggering_backtrace. pub fn start_profiling( - min_alloc_size_for_profiling: Option, - alloc_size_triggering_backtrace: Option, + min_alloc_bytes_for_profiling: Option, + alloc_bytes_triggering_backtrace: Option, ) { // Call backtrace once to warmup symbolization allocations (~30MB) backtrace::trace(|frame| { @@ -55,17 +57,21 @@ pub fn start_profiling( true }); - alloc_tracker::init(alloc_size_triggering_backtrace); + let alloc_bytes_triggering_backtrace = + alloc_bytes_triggering_backtrace.unwrap_or(DEFAULT_REPORTING_INTERVAL_BYTES); + alloc_tracker::init(alloc_bytes_triggering_backtrace); - let min_alloc_size_for_profiling = - min_alloc_size_for_profiling.unwrap_or(DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING); + let min_alloc_bytes_for_profiling = + min_alloc_bytes_for_profiling.unwrap_or(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING); // Use strong ordering to make sure all threads see these changes in this order - MIN_ALLOC_SIZE_FOR_PROFILING.store(min_alloc_size_for_profiling, Ordering::SeqCst); + MIN_ALLOC_BYTES_FOR_PROFILING.store(min_alloc_bytes_for_profiling, Ordering::SeqCst); let previously_enabled = ENABLED.swap(true, Ordering::SeqCst); info!( - min_alloc_size_for_profiling, - alloc_size_triggering_backtrace, previously_enabled, "heap profiling running" + min_alloc_for_profiling = %ByteSize(min_alloc_bytes_for_profiling), + alloc_triggering_backtrace = %ByteSize(alloc_bytes_triggering_backtrace), + previously_enabled, + "heap profiling running" ); } @@ -75,11 +81,9 @@ pub fn start_profiling( pub fn stop_profiling() { // Use strong ordering to make sure all threads see these changes in this order let previously_enabled = ENABLED.swap(false, Ordering::SeqCst); - MIN_ALLOC_SIZE_FOR_PROFILING.store(DEFAULT_MIN_ALLOC_SIZE_FOR_PROFILING, Ordering::SeqCst); + MIN_ALLOC_BYTES_FOR_PROFILING.store(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING, Ordering::SeqCst); info!(previously_enabled, "heap profiling stopped"); - // alloc_tracker::log_dump(); - // backtrace::clear_symbol_cache(); } /// Wraps the Jemalloc global allocator calls with tracking routines. @@ -126,16 +130,15 @@ unsafe impl GlobalAlloc for JemallocProfiled { } } +/// Uses backtrace::trace() which does allocate #[inline] fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { { let mut lock = std::io::stdout().lock(); let _ = writeln!( &mut lock, - "htrk callsite={} allocs={} size={}MiB", - callsite_hash, - stat.count, - stat.size / 1024 / 1024 + "htrk callsite={} allocs={} size={}", + callsite_hash, stat.count, stat.size ); backtrace::trace(|frame| { backtrace::resolve_frame(frame, |symbol| { @@ -150,6 +153,7 @@ fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { } } +/// Uses backtrace::trace() which does allocate #[inline] fn backtrace_hash() -> u64 { let mut hasher = fnv::FnvHasher::default(); @@ -160,22 +164,23 @@ fn backtrace_hash() -> u64 { hasher.finish() } +/// Warning: allocating inside this function can cause a deadlock. #[cold] fn track_alloc_call(ptr: *mut u8, layout: Layout) { - if layout.size() > MIN_ALLOC_SIZE_FOR_PROFILING.load(Ordering::Relaxed) { + if layout.size() >= MIN_ALLOC_BYTES_FOR_PROFILING.load(Ordering::Relaxed) as usize { + // warning: backtrace_hash() allocates let callsite_hash = backtrace_hash(); let recording_response = alloc_tracker::record_allocation(callsite_hash, layout.size() as u64, ptr); match recording_response { AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { + // warning: print_backtrace() allocates print_backtrace(callsite_hash, stat_for_trace); - // Could we use tracing to caracterize the call site here? - // tracing::info!(size = alloc_size_for_trace, "large alloc"); } - AllocRecordingResponse::TrackerFull(reason) => { + AllocRecordingResponse::TrackerFull(table_name) => { // this message might be displayed multiple times but that's fine - error!("{reason} full, profiling stopped"); + error!("heap profiling stopped, {table_name} full"); ENABLED.store(false, Ordering::Relaxed); } AllocRecordingResponse::ThresholdNotExceeded => {} @@ -184,9 +189,10 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { } } +/// Warning: allocating inside this function can cause a deadlock. #[cold] fn track_dealloc_call(ptr: *mut u8, layout: Layout) { - if layout.size() > MIN_ALLOC_SIZE_FOR_PROFILING.load(Ordering::Relaxed) { + if layout.size() >= MIN_ALLOC_BYTES_FOR_PROFILING.load(Ordering::Relaxed) as usize { alloc_tracker::record_deallocation(ptr); } } diff --git a/quickwit/quickwit-serve/src/developer_api/heap_prof.rs b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs index deff7311932..0e777bae3c8 100644 --- a/quickwit/quickwit-serve/src/developer_api/heap_prof.rs +++ b/quickwit/quickwit-serve/src/developer_api/heap_prof.rs @@ -21,7 +21,7 @@ pub fn heap_prof_handlers() -> impl Filter + Clone { #[derive(Deserialize)] struct ProfilerQueryParams { - min_alloc_size: Option, + min_alloc_size: Option, backtrace_every: Option, } From 79fecb2c1813c0e3fa0ae7ef14c742bb7bf44364 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 03/13] Better rationalization around nested allocations --- .../quickwit-common/src/jemalloc_profiled.rs | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 090dcd42d95..969c4498fcf 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -51,6 +51,12 @@ pub fn start_profiling( min_alloc_bytes_for_profiling: Option, alloc_bytes_triggering_backtrace: Option, ) { + #[cfg(miri)] + warn!( + "heap profiling is not supported with Miri because in that case the `backtrace` crate \ + allocates" + ); + // Call backtrace once to warmup symbolization allocations (~30MB) backtrace::trace(|frame| { backtrace::resolve_frame(frame, |_| {}); @@ -63,16 +69,19 @@ pub fn start_profiling( let min_alloc_bytes_for_profiling = min_alloc_bytes_for_profiling.unwrap_or(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING); - // Use strong ordering to make sure all threads see these changes in this order - MIN_ALLOC_BYTES_FOR_PROFILING.store(min_alloc_bytes_for_profiling, Ordering::SeqCst); - let previously_enabled = ENABLED.swap(true, Ordering::SeqCst); + // stdout() might allocate a buffer on first use. If the first allocation + // tracked comes from stdout, it will trigger a deadlock. Logging here + // guarantees that it doesn't happen. info!( min_alloc_for_profiling = %ByteSize(min_alloc_bytes_for_profiling), alloc_triggering_backtrace = %ByteSize(alloc_bytes_triggering_backtrace), - previously_enabled, "heap profiling running" ); + + // Use strong ordering to make sure all threads see these changes in this order + MIN_ALLOC_BYTES_FOR_PROFILING.store(min_alloc_bytes_for_profiling, Ordering::SeqCst); + ENABLED.store(true, Ordering::SeqCst); } /// Stops measuring heap allocations. @@ -91,6 +100,8 @@ pub fn stop_profiling() { /// The tracking routines are called only when [ENABLED] is set to true (calling /// [start_profiling()]), but we don't enforce any synchronization (we load it with /// Ordering::Relaxed) because it's fine to miss or record extra allocation events. +/// +/// It's important to ensure that no allocations are performed inside the allocator! pub struct JemallocProfiled(pub Jemalloc); unsafe impl GlobalAlloc for JemallocProfiled { @@ -130,7 +141,7 @@ unsafe impl GlobalAlloc for JemallocProfiled { } } -/// Uses backtrace::trace() which does allocate +/// Warning: stdout allocates a buffer on first use. #[inline] fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { { @@ -153,7 +164,6 @@ fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { } } -/// Uses backtrace::trace() which does allocate #[inline] fn backtrace_hash() -> u64 { let mut hasher = fnv::FnvHasher::default(); @@ -164,22 +174,22 @@ fn backtrace_hash() -> u64 { hasher.finish() } -/// Warning: allocating inside this function can cause a deadlock. +/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). #[cold] fn track_alloc_call(ptr: *mut u8, layout: Layout) { if layout.size() >= MIN_ALLOC_BYTES_FOR_PROFILING.load(Ordering::Relaxed) as usize { - // warning: backtrace_hash() allocates let callsite_hash = backtrace_hash(); let recording_response = alloc_tracker::record_allocation(callsite_hash, layout.size() as u64, ptr); match recording_response { AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { - // warning: print_backtrace() allocates + // warning: stdout might allocate a buffer on first use print_backtrace(callsite_hash, stat_for_trace); } AllocRecordingResponse::TrackerFull(table_name) => { // this message might be displayed multiple times but that's fine + // warning: stdout might allocate a buffer on first use error!("heap profiling stopped, {table_name} full"); ENABLED.store(false, Ordering::Relaxed); } @@ -189,7 +199,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { } } -/// Warning: allocating inside this function can cause a deadlock. +/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). #[cold] fn track_dealloc_call(ptr: *mut u8, layout: Layout) { if layout.size() >= MIN_ALLOC_BYTES_FOR_PROFILING.load(Ordering::Relaxed) as usize { From e46a26078e9eaf7c550fd24f46197c2d9a95440c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 04/13] Pad the atomic flags to the cache line size --- .../quickwit-common/src/jemalloc_profiled.rs | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 969c4498fcf..371026d7881 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -23,30 +23,45 @@ use tracing::{error, info}; use crate::alloc_tracker::{self, AllocRecordingResponse}; -const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 256 * 1024; +const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 64 * 1024; const DEFAULT_REPORTING_INTERVAL_BYTES: u64 = 1024 * 1024 * 1024; -// Atomics are used to communicate configurations between the start/stop -// endpoints and the JemallocProfiled allocator wrapper. - -/// The minimum allocation size that is recorded by the tracker. -static MIN_ALLOC_BYTES_FOR_PROFILING: AtomicU64 = - AtomicU64::new(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING); +/// Atomics are used to communicate configurations between the start/stop +/// endpoints and the JemallocProfiled allocator wrapper. +/// +/// The flags are padded to avoid false sharing of the CPU cache line between +/// threads. 128 bytes is the cache line size on x86_64 and arm64. +#[repr(align(128))] +struct Flags { + /// The minimum allocation size that is recorded by the tracker. + min_alloc_bytes_for_profiling: AtomicU64, + /// Whether the profiling is started or not. + enabled: AtomicBool, +} -/// Whether the profiling is started or not. -static ENABLED: AtomicBool = AtomicBool::new(false); +static FLAGS: Flags = Flags { + min_alloc_bytes_for_profiling: AtomicU64::new(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING), + enabled: AtomicBool::new(false), +}; /// Starts measuring heap allocations and logs important leaks. /// /// This function uses a wrapper around the global Jemalloc allocator to -/// instrument it. Each time an allocation bigger than -/// min_alloc_bytes_for_profiling is performed, it is recorded in a map and -/// the statistics for its call site are updated. +/// instrument it. +/// +/// Each time an allocation bigger than min_alloc_bytes_for_profiling is +/// performed, it is recorded in a map and the statistics for its call site are +/// updated. Tracking allocations is costly because it requires acquiring a +/// global mutex. Setting a reasonable value for min_alloc_bytes_for_profiling +/// is crucial. For instance for a search aggregation request, tracking every +/// allocations (min_alloc_bytes_for_profiling=1) is typically 100x slower than +/// using a minimum of 64kB. /// /// During profiling, the statistics per call site are used to log when specific /// thresholds are exceeded. For each call site, the allocated memory is logged /// (with a backtrace) every time it exceeds the last logged allocated memory by -/// at least alloc_bytes_triggering_backtrace. +/// at least alloc_bytes_triggering_backtrace. This logging interval should +/// usually be set to a value of at least 500MB to limit the logging verbosity. pub fn start_profiling( min_alloc_bytes_for_profiling: Option, alloc_bytes_triggering_backtrace: Option, @@ -80,8 +95,10 @@ pub fn start_profiling( ); // Use strong ordering to make sure all threads see these changes in this order - MIN_ALLOC_BYTES_FOR_PROFILING.store(min_alloc_bytes_for_profiling, Ordering::SeqCst); - ENABLED.store(true, Ordering::SeqCst); + FLAGS + .min_alloc_bytes_for_profiling + .store(min_alloc_bytes_for_profiling, Ordering::SeqCst); + FLAGS.enabled.store(true, Ordering::SeqCst); } /// Stops measuring heap allocations. @@ -89,8 +106,10 @@ pub fn start_profiling( /// The allocation tracking tables and the symbol cache are not cleared. pub fn stop_profiling() { // Use strong ordering to make sure all threads see these changes in this order - let previously_enabled = ENABLED.swap(false, Ordering::SeqCst); - MIN_ALLOC_BYTES_FOR_PROFILING.store(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING, Ordering::SeqCst); + let previously_enabled = FLAGS.enabled.swap(false, Ordering::SeqCst); + FLAGS + .min_alloc_bytes_for_profiling + .store(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING, Ordering::SeqCst); info!(previously_enabled, "heap profiling stopped"); } @@ -108,7 +127,7 @@ unsafe impl GlobalAlloc for JemallocProfiled { #[inline] unsafe fn alloc(&self, layout: Layout) -> *mut u8 { let ptr = unsafe { self.0.alloc(layout) }; - if ENABLED.load(Ordering::Relaxed) { + if FLAGS.enabled.load(Ordering::Relaxed) { track_alloc_call(ptr, layout); } ptr @@ -117,7 +136,7 @@ unsafe impl GlobalAlloc for JemallocProfiled { #[inline] unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { let ptr = unsafe { self.0.alloc_zeroed(layout) }; - if ENABLED.load(Ordering::Relaxed) { + if FLAGS.enabled.load(Ordering::Relaxed) { track_alloc_call(ptr, layout); } ptr @@ -125,7 +144,7 @@ unsafe impl GlobalAlloc for JemallocProfiled { #[inline] unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { - if ENABLED.load(Ordering::Relaxed) { + if FLAGS.enabled.load(Ordering::Relaxed) { track_dealloc_call(ptr, layout); } unsafe { self.0.dealloc(ptr, layout) } @@ -134,7 +153,7 @@ unsafe impl GlobalAlloc for JemallocProfiled { #[inline] unsafe fn realloc(&self, old_ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { let new_ptr = unsafe { self.0.realloc(old_ptr, layout, new_size) }; - if ENABLED.load(Ordering::Relaxed) { + if FLAGS.enabled.load(Ordering::Relaxed) { track_realloc_call(old_ptr, new_ptr, layout, new_size); } new_ptr @@ -177,7 +196,7 @@ fn backtrace_hash() -> u64 { /// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). #[cold] fn track_alloc_call(ptr: *mut u8, layout: Layout) { - if layout.size() >= MIN_ALLOC_BYTES_FOR_PROFILING.load(Ordering::Relaxed) as usize { + if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { let callsite_hash = backtrace_hash(); let recording_response = alloc_tracker::record_allocation(callsite_hash, layout.size() as u64, ptr); @@ -191,7 +210,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { // this message might be displayed multiple times but that's fine // warning: stdout might allocate a buffer on first use error!("heap profiling stopped, {table_name} full"); - ENABLED.store(false, Ordering::Relaxed); + FLAGS.enabled.store(false, Ordering::Relaxed); } AllocRecordingResponse::ThresholdNotExceeded => {} AllocRecordingResponse::NotStarted => {} @@ -202,7 +221,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { /// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). #[cold] fn track_dealloc_call(ptr: *mut u8, layout: Layout) { - if layout.size() >= MIN_ALLOC_BYTES_FOR_PROFILING.load(Ordering::Relaxed) as usize { + if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { alloc_tracker::record_deallocation(ptr); } } From 272d2327d804bf2b7dc47a2eddb952d0708f6295 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 05/13] Add padding --- quickwit/quickwit-common/src/jemalloc_profiled.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 371026d7881..495cdcf31cc 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -37,11 +37,14 @@ struct Flags { min_alloc_bytes_for_profiling: AtomicU64, /// Whether the profiling is started or not. enabled: AtomicBool, + /// Padding to make sure we fill the cache line. + _padding: [u8; 119], // 128 (align) - 8 (u64) - 1 (bool) } static FLAGS: Flags = Flags { min_alloc_bytes_for_profiling: AtomicU64::new(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING), enabled: AtomicBool::new(false), + _padding: [0; 119], }; /// Starts measuring heap allocations and logs important leaks. @@ -235,3 +238,13 @@ fn track_realloc_call( ) { // TODO handle realloc } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_size_of_flags() { + assert_eq!(std::mem::size_of::(), 128); + } +} From 9ca3af6011a72f4cec71827df0ce3ae6dfaa1751 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 06/13] Add tokio trace --- quickwit/quickwit-cli/Cargo.toml | 3 +- quickwit/quickwit-cli/src/logger.rs | 54 +++++++++++++++---- .../quickwit-common/src/jemalloc_profiled.rs | 39 +++++++------- 3 files changed, 67 insertions(+), 29 deletions(-) diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 524e77fc8fd..0d2dcd0fa77 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -133,7 +133,8 @@ release-macos-feature-vendored-set = [ ] release-heap-profiled = [ "release-feature-set", - "jemalloc-profiled" + "jemalloc-profiled", + "tokio/tracing", ] [package.metadata.cargo-machete] diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 0f951ad7477..b95f159e6f8 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -27,6 +27,7 @@ use time::format_description::BorrowedFormatItem; use tracing::{Event, Level, Subscriber}; use tracing_subscriber::EnvFilter; use tracing_subscriber::field::RecordFields; +use tracing_subscriber::filter::filter_fn; use tracing_subscriber::fmt::FmtContext; use tracing_subscriber::fmt::format::{ DefaultFields, Format, FormatEvent, FormatFields, Full, Json, JsonFields, Writer, @@ -40,6 +41,14 @@ use crate::QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY; #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; +fn startup_env_filter(level: Level) -> anyhow::Result { + let env_filter = env::var("RUST_LOG") + .map(|_| EnvFilter::from_default_env()) + .or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN"))) + .context("failed to set up tracing env filter")?; + Ok(env_filter) +} + pub fn setup_logging_and_tracing( level: Level, ansi_colors: bool, @@ -52,13 +61,10 @@ pub fn setup_logging_and_tracing( return Ok(quickwit_serve::do_nothing_env_filter_reload_fn()); } } - let env_filter = env::var("RUST_LOG") - .map(|_| EnvFilter::from_default_env()) - .or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN"))) - .context("failed to set up tracing env filter")?; global::set_text_map_propagator(TraceContextPropagator::new()); - let (reloadable_env_filter, reload_handle) = tracing_subscriber::reload::Layer::new(env_filter); - let registry = tracing_subscriber::registry().with(reloadable_env_filter); + let (reloadable_env_filter, reload_handle) = + tracing_subscriber::reload::Layer::new(startup_env_filter(level)?); + let registry = tracing_subscriber::registry(); // Note on disabling ANSI characters: setting the ansi boolean on event format is insufficient. // It is thus set on layers, see https://github.com/tokio-rs/tracing/issues/1817 if get_bool_from_env(QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY, false) { @@ -90,6 +96,7 @@ pub fn setup_logging_and_tracing( let fmt_fields = event_format.format_fields(); registry + .with(reloadable_env_filter) .with(telemetry_layer) .with( tracing_subscriber::fmt::layer() @@ -102,17 +109,44 @@ pub fn setup_logging_and_tracing( } else { let event_format = EventFormat::get_from_env(); let fmt_fields = event_format.format_fields(); - - registry + #[cfg(not(feature = "jemalloc-profiled"))] + let registry = registry.with(reloadable_env_filter).with( + tracing_subscriber::fmt::layer() + .event_format(event_format) + .fmt_fields(fmt_fields) + .with_ansi(ansi_colors), + ); + // the heap profiler disables the env filter reloading because it seemed + // to overwrite the profiling filter + #[cfg(feature = "jemalloc-profiled")] + let registry = registry + .with( + tracing_subscriber::fmt::layer() + .event_format(EventFormat::get_from_env()) + .fmt_fields(EventFormat::get_from_env().format_fields()) + .with_ansi(ansi_colors) + .with_filter(filter_fn(|metadata| { + metadata.is_span() + || (metadata.is_event() + && metadata.level() == &Level::TRACE + && metadata + .target() + .starts_with("quickwit_common::jemalloc_profiled")) + })), + ) .with( tracing_subscriber::fmt::layer() .event_format(event_format) .fmt_fields(fmt_fields) - .with_ansi(ansi_colors), - ) + .with_ansi(ansi_colors) + .with_filter(startup_env_filter(level)?), + ); + + registry .try_init() .context("failed to register tracing subscriber")?; } + Ok(Arc::new(move |env_filter_def: &str| { let new_env_filter = EnvFilter::try_new(env_filter_def)?; reload_handle.reload(new_env_filter)?; diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 495cdcf31cc..7365d84eff0 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use bytesize::ByteSize; use tikv_jemallocator::Jemalloc; -use tracing::{error, info}; +use tracing::{error, info, trace}; use crate::alloc_tracker::{self, AllocRecordingResponse}; @@ -166,24 +166,22 @@ unsafe impl GlobalAlloc for JemallocProfiled { /// Warning: stdout allocates a buffer on first use. #[inline] fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { - { - let mut lock = std::io::stdout().lock(); - let _ = writeln!( - &mut lock, - "htrk callsite={} allocs={} size={}", - callsite_hash, stat.count, stat.size - ); - backtrace::trace(|frame| { - backtrace::resolve_frame(frame, |symbol| { - if let Some(symbole_name) = symbol.name() { - let _ = writeln!(&mut lock, "{}", symbole_name); - } else { - let _ = writeln!(&mut lock, "symb failed"); - } - }); - true + let mut lock = std::io::stdout().lock(); + let _ = writeln!( + &mut lock, + "htrk callsite={} allocs={} size={}", + callsite_hash, stat.count, stat.size, + ); + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |symbol| { + if let Some(symbole_name) = symbol.name() { + let _ = writeln!(&mut lock, "{}", symbole_name); + } else { + let _ = writeln!(&mut lock, "symb failed"); + } }); - } + true + }); } #[inline] @@ -206,8 +204,13 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { match recording_response { AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { + // print both a backtrace and a Tokio tracing log // warning: stdout might allocate a buffer on first use print_backtrace(callsite_hash, stat_for_trace); + // for this to generate a complete trace: + // - tokio/tracing feature must be enabled + // - the tracing fmt subscriber filter must keep all spans for this event + trace!(callsite=callsite_hash,allocs=stat_for_trace.count,size=%stat_for_trace.size, "htrk"); } AllocRecordingResponse::TrackerFull(table_name) => { // this message might be displayed multiple times but that's fine From 4d199c081371145bc2b79644e145e1ad5a4e4ef7 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 07/13] Move backtrace to trace event format --- quickwit/Cargo.lock | 1 + quickwit/quickwit-cli/Cargo.toml | 3 +- quickwit/quickwit-cli/src/logger.rs | 118 +++++++++++++--- quickwit/quickwit-common/Cargo.toml | 1 + quickwit/quickwit-common/src/alloc_tracker.rs | 131 +++++++++++++++++- .../quickwit-common/src/jemalloc_profiled.rs | 68 ++++----- 6 files changed, 268 insertions(+), 54 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b9158f299d8..f82e883b89a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6745,6 +6745,7 @@ name = "quickwit-cli" version = "0.8.0" dependencies = [ "anyhow", + "backtrace", "bytesize", "chrono", "clap", diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 0d2dcd0fa77..69e634de611 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -22,6 +22,7 @@ path = "src/generate_markdown.rs" [dependencies] anyhow = { workspace = true } +backtrace = { workspace = true, optional = true } bytesize = { workspace = true } chrono = { workspace = true } clap = { workspace = true } @@ -82,6 +83,7 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } [features] jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"] jemalloc-profiled = [ + "dep:backtrace", "quickwit-common/jemalloc-profiled", "quickwit-serve/jemalloc-profiled" ] @@ -134,7 +136,6 @@ release-macos-feature-vendored-set = [ release-heap-profiled = [ "release-feature-set", "jemalloc-profiled", - "tokio/tracing", ] [package.metadata.cargo-machete] diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index b95f159e6f8..ee06f9070b4 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -21,13 +21,13 @@ use opentelemetry::{KeyValue, global}; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::BatchConfigBuilder; use opentelemetry_sdk::{Resource, trace}; +use quickwit_common::jemalloc_profiled::JEMALLOC_PROFILER_TARGET; use quickwit_common::{get_bool_from_env, get_from_env_opt}; use quickwit_serve::{BuildInfo, EnvFilterReloadFn}; use time::format_description::BorrowedFormatItem; use tracing::{Event, Level, Subscriber}; use tracing_subscriber::EnvFilter; use tracing_subscriber::field::RecordFields; -use tracing_subscriber::filter::filter_fn; use tracing_subscriber::fmt::FmtContext; use tracing_subscriber::fmt::format::{ DefaultFields, Format, FormatEvent, FormatFields, Full, Json, JsonFields, Writer, @@ -116,22 +116,20 @@ pub fn setup_logging_and_tracing( .fmt_fields(fmt_fields) .with_ansi(ansi_colors), ); - // the heap profiler disables the env filter reloading because it seemed - // to overwrite the profiling filter + // The the jemalloc profiler formatter disables the env filter reloading + // because the layer seemed to overwrite the TRACE level span filter. #[cfg(feature = "jemalloc-profiled")] let registry = registry .with( tracing_subscriber::fmt::layer() - .event_format(EventFormat::get_from_env()) - .fmt_fields(EventFormat::get_from_env().format_fields()) + .event_format(jemalloc_profiled::ProfilingFormat::default()) + .fmt_fields(DefaultFields::new()) .with_ansi(ansi_colors) - .with_filter(filter_fn(|metadata| { + .with_filter(tracing_subscriber::filter::filter_fn(|metadata| { metadata.is_span() || (metadata.is_event() && metadata.level() == &Level::TRACE - && metadata - .target() - .starts_with("quickwit_common::jemalloc_profiled")) + && metadata.target() == JEMALLOC_PROFILER_TARGET) })), ) .with( @@ -154,6 +152,16 @@ pub fn setup_logging_and_tracing( })) } +/// We do not rely on the RFC3339 implementation, because it has a nanosecond precision. +/// See discussion here: https://github.com/time-rs/time/discussions/418 +fn time_formatter() -> UtcTime>> { + let time_format = time::format_description::parse( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z", + ) + .expect("time format description should be valid"); + UtcTime::new(time_format) +} + enum EventFormat<'a> { Full(Format>>>), Json(Format), @@ -170,17 +178,9 @@ impl EventFormat<'_> { let json_format = tracing_subscriber::fmt::format().json(); EventFormat::Json(json_format) } else { - // We do not rely on the RFC3339 implementation, because it has a nanosecond precision. - // See discussion here: https://github.com/time-rs/time/discussions/418 - let timer_format = time::format_description::parse( - "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z", - ) - .expect("time format description should be valid"); - let timer = UtcTime::new(timer_format); - let full_format = tracing_subscriber::fmt::format() .with_target(true) - .with_timer(timer); + .with_timer(time_formatter()); EventFormat::Full(full_format) } @@ -225,3 +225,85 @@ impl FormatFields<'_> for FieldFormat { } } } + +#[cfg(feature = "jemalloc-profiled")] +pub(super) mod jemalloc_profiled { + use std::fmt; + + use quickwit_common::jemalloc_profiled::JEMALLOC_PROFILER_TARGET; + use time::format_description::BorrowedFormatItem; + use tracing::{Event, Subscriber}; + use tracing_subscriber::fmt::format::Writer; + use tracing_subscriber::fmt::time::{FormatTime, UtcTime}; + use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields}; + use tracing_subscriber::registry::LookupSpan; + + use super::time_formatter; + + /// An event formatter specific to the memory profiler output. + /// + /// Besides printing the spans and the fields of the tracing event, it also + /// displays a backtrace. + pub struct ProfilingFormat { + time_formatter: UtcTime>>, + } + + impl Default for ProfilingFormat { + fn default() -> Self { + Self { + time_formatter: time_formatter(), + } + } + } + + impl FormatEvent for ProfilingFormat + where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, + { + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &Event<'_>, + ) -> fmt::Result { + self.time_formatter.format_time(&mut writer)?; + write!(writer, " {JEMALLOC_PROFILER_TARGET} ")?; + if let Some(scope) = ctx.event_scope() { + let mut seen = false; + + for span in scope.from_root() { + write!(writer, "{}", span.metadata().name())?; + seen = true; + + let ext = span.extensions(); + if let Some(fields) = &ext.get::>() { + if !fields.is_empty() { + write!(writer, "{{{}}}:", fields)?; + } + } + } + + if seen { + writer.write_char(' ')?; + } + }; + + ctx.format_fields(writer.by_ref(), event)?; + writeln!(writer)?; + + // Print a backtrace to help idenify the callsite + backtrace::trace(|frame| { + backtrace::resolve_frame(frame, |symbol| { + if let Some(symbole_name) = symbol.name() { + let _ = writeln!(writer, "{}", symbole_name); + } else { + let _ = writeln!(writer, "symb failed"); + } + }); + true + }); + Ok(()) + } + } +} diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 5f0e7f1375b..dceb3fa7008 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -51,6 +51,7 @@ tracing = { workspace = true } testsuite = [] named_tasks = ["tokio/tracing"] jemalloc-profiled = [ + "named_tasks", "dep:backtrace", "dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl" diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs index 655e5962e7d..de8c1e4e6d5 100644 --- a/quickwit/quickwit-common/src/alloc_tracker.rs +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -77,6 +77,15 @@ pub enum AllocRecordingResponse { NotStarted, } +pub enum ReallocRecordingResponse { + ThresholdExceeded { + statistics: Statistic, + callsite_hash: u64, + }, + ThresholdNotExceeded, + NotStarted, +} + /// Records an allocation and occasionally reports the cumulated allocation size /// for the provided callsite_hash. /// @@ -129,6 +138,58 @@ pub fn record_allocation( } } +/// Updates the the memory location and size of an existing allocation. Only +/// update the statistics if the original allocation was recorded. +pub fn record_reallocation( + new_size_bytes: u64, + old_ptr: *mut u8, + new_ptr: *mut u8, +) -> ReallocRecordingResponse { + let mut guard = ALLOCATION_TRACKER.lock().unwrap(); + let Status::Started { reporting_interval } = guard.status else { + return ReallocRecordingResponse::NotStarted; + }; + let (callsite_hash, old_size_bytes) = if old_ptr != new_ptr { + let Some(old_alloc) = guard.memory_locations.remove(&(old_ptr as usize)) else { + return ReallocRecordingResponse::ThresholdNotExceeded; + }; + guard.memory_locations.insert( + new_ptr as usize, + Allocation { + callsite_hash: old_alloc.callsite_hash, + size: ByteSize(new_size_bytes), + }, + ); + (old_alloc.callsite_hash, old_alloc.size.0) + } else { + let Some(alloc) = guard.memory_locations.get_mut(&(old_ptr as usize)) else { + return ReallocRecordingResponse::ThresholdNotExceeded; + }; + alloc.size = ByteSize(new_size_bytes); + (alloc.callsite_hash, alloc.size.0) + }; + + let delta = new_size_bytes as i64 - old_size_bytes as i64; + + let Some(current_stat) = guard.callsite_statistics.get_mut(&callsite_hash) else { + // tables are inconsistent, this should not happen + return ReallocRecordingResponse::ThresholdNotExceeded; + }; + current_stat.size = ByteSize((current_stat.size.0 as i64 + delta) as u64); + let new_threshold_exceeded = + current_stat.size > (current_stat.last_report + reporting_interval); + if new_threshold_exceeded { + let reported_statistic = *current_stat; + current_stat.last_report = current_stat.size; + ReallocRecordingResponse::ThresholdExceeded { + statistics: reported_statistic, + callsite_hash, + } + } else { + ReallocRecordingResponse::ThresholdNotExceeded + } +} + /// WARN: this function should not allocate! pub fn record_deallocation(ptr: *mut u8) { let mut guard = ALLOCATION_TRACKER.lock().unwrap(); @@ -157,7 +218,6 @@ pub fn record_deallocation(ptr: *mut u8) { #[cfg(test)] mod tests { - use super::*; #[test] @@ -202,6 +262,55 @@ mod tests { )); } + #[test] + #[serial_test::file_serial] + fn test_record_allocation_and_reallocation() { + init(2000); + let callsite_hash_1 = 777; + + let ptr_1 = 0x1 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_1); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + let ptr_2 = 0x2 as *mut u8; + let response = record_allocation(callsite_hash_1, 1500, ptr_2); + let AllocRecordingResponse::ThresholdExceeded(statistic) = response else { + panic!("Expected ThresholdExceeded response"); + }; + assert_eq!(statistic.count, 2); + assert_eq!(statistic.size, ByteSize(3000)); + assert_eq!(statistic.last_report, ByteSize(0)); + + record_reallocation(2000, ptr_1, ptr_1); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + record_reallocation(3000, ptr_1, ptr_1); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + let ptr_3 = 0x3 as *mut u8; + record_reallocation(1500, ptr_1, ptr_3); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + + // once an existing allocation moved, it's previous location can be re-allocated + let response = record_allocation(callsite_hash_1, 1500, ptr_1); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + } + #[test] #[serial_test::file_serial] fn test_tracker_full() { @@ -220,6 +329,9 @@ mod tests { AllocRecordingResponse::ThresholdNotExceeded )); } + + // the map is full + let response = record_allocation(777, 10, (memory_locations_capacity + 1) as *mut u8); assert!(matches!( response, @@ -232,5 +344,22 @@ mod tests { .memory_locations .capacity(); assert_eq!(current_memory_locations_capacity, memory_locations_capacity); + + let response = record_reallocation( + 10, + std::ptr::null_mut::(), + (memory_locations_capacity + 1) as *mut u8, + ); + assert!(matches!( + response, + ReallocRecordingResponse::ThresholdNotExceeded, + )); + // make sure that the map didn't grow + let current_memory_locations_capacity = ALLOCATION_TRACKER + .lock() + .unwrap() + .memory_locations + .capacity(); + assert_eq!(current_memory_locations_capacity, memory_locations_capacity); } } diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 7365d84eff0..11abf9dd5a7 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -14,18 +14,19 @@ use std::alloc::{GlobalAlloc, Layout}; use std::hash::Hasher; -use std::io::Write; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use bytesize::ByteSize; use tikv_jemallocator::Jemalloc; use tracing::{error, info, trace}; -use crate::alloc_tracker::{self, AllocRecordingResponse}; +use crate::alloc_tracker::{self, AllocRecordingResponse, ReallocRecordingResponse}; const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 64 * 1024; const DEFAULT_REPORTING_INTERVAL_BYTES: u64 = 1024 * 1024 * 1024; +pub const JEMALLOC_PROFILER_TARGET: &str = "jemprof"; + /// Atomics are used to communicate configurations between the start/stop /// endpoints and the JemallocProfiled allocator wrapper. /// @@ -163,28 +164,17 @@ unsafe impl GlobalAlloc for JemallocProfiled { } } -/// Warning: stdout allocates a buffer on first use. -#[inline] -fn print_backtrace(callsite_hash: u64, stat: alloc_tracker::Statistic) { - let mut lock = std::io::stdout().lock(); - let _ = writeln!( - &mut lock, - "htrk callsite={} allocs={} size={}", - callsite_hash, stat.count, stat.size, - ); - backtrace::trace(|frame| { - backtrace::resolve_frame(frame, |symbol| { - if let Some(symbole_name) = symbol.name() { - let _ = writeln!(&mut lock, "{}", symbole_name); - } else { - let _ = writeln!(&mut lock, "symb failed"); - } - }); - true - }); +/// Prints both a backtrace and a Tokio tracing log +/// +/// Warning: stdout might allocate a buffer on first use +fn identify_callsite(callsite_hash: u64, stat: alloc_tracker::Statistic) { + // to generate a complete trace: + // - tokio/tracing feature must be enabled, otherwise un-instrumented tasks will not propagate + // spans + // - the tracing fmt subscriber filter must keep all spans for this event (at least debug) + trace!(target: JEMALLOC_PROFILER_TARGET, callsite=callsite_hash, allocs=stat.count, size=%stat.size); } -#[inline] fn backtrace_hash() -> u64 { let mut hasher = fnv::FnvHasher::default(); backtrace::trace(|frame| { @@ -204,13 +194,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { match recording_response { AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { - // print both a backtrace and a Tokio tracing log - // warning: stdout might allocate a buffer on first use - print_backtrace(callsite_hash, stat_for_trace); - // for this to generate a complete trace: - // - tokio/tracing feature must be enabled - // - the tracing fmt subscriber filter must keep all spans for this event - trace!(callsite=callsite_hash,allocs=stat_for_trace.count,size=%stat_for_trace.size, "htrk"); + identify_callsite(callsite_hash, stat_for_trace); } AllocRecordingResponse::TrackerFull(table_name) => { // this message might be displayed multiple times but that's fine @@ -232,14 +216,30 @@ fn track_dealloc_call(ptr: *mut u8, layout: Layout) { } } +/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). #[cold] fn track_realloc_call( - _old_ptr: *mut u8, - _new_pointer: *mut u8, - _current_layout: Layout, - _new_size: usize, + old_ptr: *mut u8, + new_pointer: *mut u8, + current_layout: Layout, + new_size: usize, ) { - // TODO handle realloc + if current_layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize + { + let recording_response = + alloc_tracker::record_reallocation(new_size as u64, old_ptr, new_pointer); + + match recording_response { + ReallocRecordingResponse::ThresholdExceeded { + statistics, + callsite_hash, + } => { + identify_callsite(callsite_hash, statistics); + } + ReallocRecordingResponse::ThresholdNotExceeded => {} + ReallocRecordingResponse::NotStarted => {} + } + } } #[cfg(test)] From ee26bdcc543b0349e1da3339a72dc72ad477cac9 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 08/13] Improve loc for spawned task when using tokio/tracing --- quickwit/quickwit-common/src/lib.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 2b9fa51474d..8d3b73da67d 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -213,24 +213,28 @@ pub fn num_cpus() -> usize { // The following are helpers to build named tasks. // -// Named tasks require the tokio feature `tracing` to be enabled. -// If the `named_tasks` feature is disabled, this is no-op. +// Named tasks require the tokio feature `tracing` to be enabled. If the +// `named_tasks` feature is disabled, this is no-op. // -// By default, these function will just ignore the name passed and just act -// like a regular call to `tokio::spawn`. +// By default, these function will just ignore the name passed and just act like +// a regular call to `tokio::spawn`. // -// If the user compiles `quickwit-cli` with the `tokio-console` feature, -// then tasks will automatically be named. This is not just "visual sugar". +// If the user compiles `quickwit-cli` with the `tokio-console` feature, then +// tasks will automatically be named. This is not just "visual sugar". // -// Without names, tasks will only show their spawn site on tokio-console. -// This is a catastrophy for actors who all share the same spawn site. +// Without names, tasks will only show their spawn site on tokio-console. This +// is a catastrophy for actors who all share the same spawn site. +// +// The #[track_caller] annotation is used to show the right spawn site in the +// Tokio TRACE spans (only available when the tokio/tracing feature is on). // // # Naming // -// Actors will get named after their type, which is fine. -// For other tasks, please use `snake_case`. +// Actors will get named after their type, which is fine. For other tasks, +// please use `snake_case`. #[cfg(not(all(tokio_unstable, feature = "named_tasks")))] +#[track_caller] pub fn spawn_named_task(future: F, _name: &'static str) -> tokio::task::JoinHandle where F: Future + Send + 'static, @@ -240,6 +244,7 @@ where } #[cfg(not(all(tokio_unstable, feature = "named_tasks")))] +#[track_caller] pub fn spawn_named_task_on( future: F, _name: &'static str, @@ -253,6 +258,7 @@ where } #[cfg(all(tokio_unstable, feature = "named_tasks"))] +#[track_caller] pub fn spawn_named_task(future: F, name: &'static str) -> tokio::task::JoinHandle where F: Future + Send + 'static, @@ -265,6 +271,7 @@ where } #[cfg(all(tokio_unstable, feature = "named_tasks"))] +#[track_caller] pub fn spawn_named_task_on( future: F, name: &'static str, From 088a685969706dc2e915bd3b704c0a0733a4778c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 09/13] Move mutex from tracker --- quickwit/Cargo.toml | 1 + quickwit/quickwit-cli/src/logger.rs | 4 +- quickwit/quickwit-common/src/alloc_tracker.rs | 468 ++++++++++-------- .../quickwit-common/src/jemalloc_profiled.rs | 32 +- 4 files changed, 296 insertions(+), 209 deletions(-) diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 460dfb06756..ba77ae5e507 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -118,6 +118,7 @@ google-cloud-auth = "0.12.0" google-cloud-gax = "0.15.0" google-cloud-googleapis = { version = "0.10.0", features = ["pubsub"] } google-cloud-pubsub = "0.18.0" +hashbrown = "0.15" heck = "0.4.1" hex = "0.4.3" home = "0.5.4" diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index ee06f9070b4..9a9a3270595 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -21,7 +21,6 @@ use opentelemetry::{KeyValue, global}; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::BatchConfigBuilder; use opentelemetry_sdk::{Resource, trace}; -use quickwit_common::jemalloc_profiled::JEMALLOC_PROFILER_TARGET; use quickwit_common::{get_bool_from_env, get_from_env_opt}; use quickwit_serve::{BuildInfo, EnvFilterReloadFn}; use time::format_description::BorrowedFormatItem; @@ -129,7 +128,8 @@ pub fn setup_logging_and_tracing( metadata.is_span() || (metadata.is_event() && metadata.level() == &Level::TRACE - && metadata.target() == JEMALLOC_PROFILER_TARGET) + && metadata.target() + == quickwit_common::jemalloc_profiled::JEMALLOC_PROFILER_TARGET) })), ) .with( diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs index de8c1e4e6d5..0c9459fc9fa 100644 --- a/quickwit/quickwit-common/src/alloc_tracker.rs +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -14,13 +14,8 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; -use std::sync::Mutex; use bytesize::ByteSize; -use once_cell::sync::Lazy; - -static ALLOCATION_TRACKER: Lazy> = - Lazy::new(|| Mutex::new(Allocations::default())); #[derive(Debug)] struct Allocation { @@ -29,14 +24,14 @@ struct Allocation { } #[derive(Debug, Copy, Clone)] -pub struct Statistic { +pub struct AllocStat { pub count: u64, pub size: ByteSize, pub last_report: ByteSize, } #[derive(Debug)] -enum Status { +enum TrackerStatus { Started { reporting_interval: ByteSize }, Stopped, } @@ -45,33 +40,33 @@ enum Status { /// - keys and values in these maps should not allocate! /// - we assume HashMaps don't allocate if their capacity is not exceeded #[derive(Debug)] -struct Allocations { +pub struct Allocations { memory_locations: HashMap, - callsite_statistics: HashMap, - status: Status, + max_tracked_memory_locations: usize, + callsite_statistics: HashMap, + max_tracked_callsites: usize, + status: TrackerStatus, } impl Default for Allocations { fn default() -> Self { + let max_tracked_memory_locations = 128 * 1024; + let max_tracked_callsites = 32 * 1024; + // TODO: We use a load factor of 0.5 to avoid resizing. There is no + // strict guarantee with std::collections::HashMap that it's enough, but + // it seems to be the case in practice (see test_tracker_full). Self { - memory_locations: HashMap::with_capacity(128 * 1024), - callsite_statistics: HashMap::with_capacity(32 * 1024), - status: Status::Stopped, + memory_locations: HashMap::with_capacity(2 * max_tracked_memory_locations), + max_tracked_memory_locations, + callsite_statistics: HashMap::with_capacity(2 * max_tracked_callsites), + max_tracked_callsites, + status: TrackerStatus::Stopped, } } } -pub fn init(reporting_interval_bytes: u64) { - let mut guard = ALLOCATION_TRACKER.lock().unwrap(); - guard.memory_locations.clear(); - guard.callsite_statistics.clear(); - guard.status = Status::Started { - reporting_interval: ByteSize(reporting_interval_bytes), - } -} - pub enum AllocRecordingResponse { - ThresholdExceeded(Statistic), + ThresholdExceeded(AllocStat), ThresholdNotExceeded, TrackerFull(&'static str), NotStarted, @@ -79,139 +74,149 @@ pub enum AllocRecordingResponse { pub enum ReallocRecordingResponse { ThresholdExceeded { - statistics: Statistic, + statistics: AllocStat, callsite_hash: u64, }, ThresholdNotExceeded, NotStarted, } -/// Records an allocation and occasionally reports the cumulated allocation size -/// for the provided callsite_hash. -/// -/// Every time a the total allocated size with the same callsite_hash -/// exceeds the previous reported value by at least reporting_interval, that -/// allocated size is reported. -/// -/// WARN: this function should not allocate! -pub fn record_allocation( - callsite_hash: u64, - size_bytes: u64, - ptr: *mut u8, -) -> AllocRecordingResponse { - let mut guard = ALLOCATION_TRACKER.lock().unwrap(); - let Status::Started { reporting_interval } = guard.status else { - return AllocRecordingResponse::NotStarted; - }; - if guard.memory_locations.capacity() == guard.memory_locations.len() { - return AllocRecordingResponse::TrackerFull("memory_locations"); - } - if guard.callsite_statistics.capacity() == guard.callsite_statistics.len() { - return AllocRecordingResponse::TrackerFull("memory_locations"); - } - guard.memory_locations.insert( - ptr as usize, - Allocation { - callsite_hash, - size: ByteSize(size_bytes), - }, - ); - let entry = guard - .callsite_statistics - .entry(callsite_hash) - .and_modify(|stat| { - stat.count += 1; - stat.size += size_bytes; - }) - .or_insert(Statistic { - count: 1, - size: ByteSize(size_bytes), - last_report: ByteSize(0), - }); - let new_threshold_exceeded = entry.size > (entry.last_report + reporting_interval); - if new_threshold_exceeded { - let reported_statistic = *entry; - entry.last_report = entry.size; - AllocRecordingResponse::ThresholdExceeded(reported_statistic) - } else { - AllocRecordingResponse::ThresholdNotExceeded +impl Allocations { + pub fn init(&mut self, reporting_interval_bytes: u64) { + self.memory_locations.clear(); + self.callsite_statistics.clear(); + self.status = TrackerStatus::Started { + reporting_interval: ByteSize(reporting_interval_bytes), + } } -} -/// Updates the the memory location and size of an existing allocation. Only -/// update the statistics if the original allocation was recorded. -pub fn record_reallocation( - new_size_bytes: u64, - old_ptr: *mut u8, - new_ptr: *mut u8, -) -> ReallocRecordingResponse { - let mut guard = ALLOCATION_TRACKER.lock().unwrap(); - let Status::Started { reporting_interval } = guard.status else { - return ReallocRecordingResponse::NotStarted; - }; - let (callsite_hash, old_size_bytes) = if old_ptr != new_ptr { - let Some(old_alloc) = guard.memory_locations.remove(&(old_ptr as usize)) else { - return ReallocRecordingResponse::ThresholdNotExceeded; + /// Records an allocation and occasionally reports the cumulated allocation size + /// for the provided callsite_hash. + /// + /// Every time a the total allocated size with the same callsite_hash + /// exceeds the previous reported value by at least reporting_interval, that + /// allocated size is reported. + /// + /// WARN: this function should not allocate! + pub fn record_allocation( + &mut self, + callsite_hash: u64, + size_bytes: u64, + ptr: *mut u8, + ) -> AllocRecordingResponse { + let TrackerStatus::Started { reporting_interval } = self.status else { + return AllocRecordingResponse::NotStarted; }; - guard.memory_locations.insert( - new_ptr as usize, + if self.max_tracked_memory_locations == self.memory_locations.len() { + return AllocRecordingResponse::TrackerFull("memory_locations"); + } + if self.max_tracked_callsites == self.callsite_statistics.len() { + return AllocRecordingResponse::TrackerFull("memory_locations"); + } + self.memory_locations.insert( + ptr as usize, Allocation { - callsite_hash: old_alloc.callsite_hash, - size: ByteSize(new_size_bytes), + callsite_hash, + size: ByteSize(size_bytes), }, ); - (old_alloc.callsite_hash, old_alloc.size.0) - } else { - let Some(alloc) = guard.memory_locations.get_mut(&(old_ptr as usize)) else { + let entry = self + .callsite_statistics + .entry(callsite_hash) + .and_modify(|stat| { + stat.count += 1; + stat.size += size_bytes; + }) + .or_insert(AllocStat { + count: 1, + size: ByteSize(size_bytes), + last_report: ByteSize(0), + }); + let new_threshold_exceeded = entry.size >= (entry.last_report + reporting_interval); + if new_threshold_exceeded { + let reported_statistic = *entry; + entry.last_report = entry.size; + AllocRecordingResponse::ThresholdExceeded(reported_statistic) + } else { + AllocRecordingResponse::ThresholdNotExceeded + } + } + + /// Updates the the memory location and size of an existing allocation. Only + /// update the statistics if the original allocation was recorded. + pub fn record_reallocation( + &mut self, + new_size_bytes: u64, + old_ptr: *mut u8, + new_ptr: *mut u8, + ) -> ReallocRecordingResponse { + let TrackerStatus::Started { reporting_interval } = self.status else { + return ReallocRecordingResponse::NotStarted; + }; + let (callsite_hash, old_size_bytes) = if old_ptr != new_ptr { + let Some(old_alloc) = self.memory_locations.remove(&(old_ptr as usize)) else { + return ReallocRecordingResponse::ThresholdNotExceeded; + }; + self.memory_locations.insert( + new_ptr as usize, + Allocation { + callsite_hash: old_alloc.callsite_hash, + size: ByteSize(new_size_bytes), + }, + ); + (old_alloc.callsite_hash, old_alloc.size.0) + } else { + let Some(alloc) = self.memory_locations.get_mut(&(old_ptr as usize)) else { + return ReallocRecordingResponse::ThresholdNotExceeded; + }; + let old_size_bytes = alloc.size.0; + alloc.size = ByteSize(new_size_bytes); + (alloc.callsite_hash, old_size_bytes) + }; + + let delta = new_size_bytes as i64 - old_size_bytes as i64; + + let Some(current_stat) = self.callsite_statistics.get_mut(&callsite_hash) else { + // tables are inconsistent, this should not happen return ReallocRecordingResponse::ThresholdNotExceeded; }; - alloc.size = ByteSize(new_size_bytes); - (alloc.callsite_hash, alloc.size.0) - }; - - let delta = new_size_bytes as i64 - old_size_bytes as i64; - - let Some(current_stat) = guard.callsite_statistics.get_mut(&callsite_hash) else { - // tables are inconsistent, this should not happen - return ReallocRecordingResponse::ThresholdNotExceeded; - }; - current_stat.size = ByteSize((current_stat.size.0 as i64 + delta) as u64); - let new_threshold_exceeded = - current_stat.size > (current_stat.last_report + reporting_interval); - if new_threshold_exceeded { - let reported_statistic = *current_stat; - current_stat.last_report = current_stat.size; - ReallocRecordingResponse::ThresholdExceeded { - statistics: reported_statistic, - callsite_hash, + current_stat.size = ByteSize((current_stat.size.0 as i64 + delta) as u64); + let new_threshold_exceeded = + current_stat.size >= (current_stat.last_report + reporting_interval); + if new_threshold_exceeded { + let reported_statistic = *current_stat; + current_stat.last_report = current_stat.size; + ReallocRecordingResponse::ThresholdExceeded { + statistics: reported_statistic, + callsite_hash, + } + } else { + ReallocRecordingResponse::ThresholdNotExceeded } - } else { - ReallocRecordingResponse::ThresholdNotExceeded } -} -/// WARN: this function should not allocate! -pub fn record_deallocation(ptr: *mut u8) { - let mut guard = ALLOCATION_TRACKER.lock().unwrap(); - if let Status::Stopped = guard.status { - return; - } - let Some(Allocation { - size, - callsite_hash, - .. - }) = guard.memory_locations.remove(&(ptr as usize)) - else { - // this was allocated before the tracking started - return; - }; - if let Entry::Occupied(mut content) = guard.callsite_statistics.entry(callsite_hash) { - let new_size_bytes = content.get().size.0.saturating_sub(size.0); - let new_count = content.get().count.saturating_sub(1); - content.get_mut().count = new_count; - content.get_mut().size = ByteSize(new_size_bytes); - if content.get().count == 0 { - content.remove(); + /// WARN: this function should not allocate! + pub fn record_deallocation(&mut self, ptr: *mut u8) { + if let TrackerStatus::Stopped = self.status { + return; + } + let Some(Allocation { + size, + callsite_hash, + .. + }) = self.memory_locations.remove(&(ptr as usize)) + else { + // this was allocated before the tracking started + return; + }; + if let Entry::Occupied(mut content) = self.callsite_statistics.entry(callsite_hash) { + let new_size_bytes = content.get().size.0.saturating_sub(size.0); + let new_count = content.get().count.saturating_sub(1); + content.get_mut().count = new_count; + content.get_mut().size = ByteSize(new_size_bytes); + if content.get().count == 0 { + content.remove(); + } } } } @@ -220,21 +225,25 @@ pub fn record_deallocation(ptr: *mut u8) { mod tests { use super::*; + fn as_ptr(i: usize) -> *mut u8 { + i as *mut u8 + } + #[test] - #[serial_test::file_serial] fn test_record_allocation_and_deallocation() { - init(2000); + let mut allocations = Allocations::default(); + allocations.init(2000); let callsite_hash_1 = 777; - let ptr_1 = 0x1 as *mut u8; - let response = record_allocation(callsite_hash_1, 1500, ptr_1); + let ptr_1 = as_ptr(1); + let response = allocations.record_allocation(callsite_hash_1, 1500, ptr_1); assert!(matches!( response, AllocRecordingResponse::ThresholdNotExceeded )); - let ptr_2 = 0x2 as *mut u8; - let response = record_allocation(callsite_hash_1, 1500, ptr_2); + let ptr_2 = as_ptr(2); + let response = allocations.record_allocation(callsite_hash_1, 1500, ptr_2); let AllocRecordingResponse::ThresholdExceeded(statistic) = response else { panic!("Expected ThresholdExceeded response"); }; @@ -242,11 +251,11 @@ mod tests { assert_eq!(statistic.size, ByteSize(3000)); assert_eq!(statistic.last_report, ByteSize(0)); - record_deallocation(ptr_2); + allocations.record_deallocation(ptr_2); // the threshold was already crossed - let ptr_3 = 0x3 as *mut u8; - let response = record_allocation(callsite_hash_1, 1500, ptr_3); + let ptr_3 = as_ptr(3); + let response = allocations.record_allocation(callsite_hash_1, 1500, ptr_3); assert!(matches!( response, AllocRecordingResponse::ThresholdNotExceeded @@ -254,8 +263,8 @@ mod tests { // this is a brand new call site with different statistics let callsite_hash_2 = 42; - let ptr_3 = 0x3 as *mut u8; - let response = record_allocation(callsite_hash_2, 1500, ptr_3); + let ptr_4 = as_ptr(4); + let response = allocations.record_allocation(callsite_hash_2, 1500, ptr_4); assert!(matches!( response, AllocRecordingResponse::ThresholdNotExceeded @@ -263,20 +272,20 @@ mod tests { } #[test] - #[serial_test::file_serial] fn test_record_allocation_and_reallocation() { - init(2000); + let mut allocations = Allocations::default(); + allocations.init(2000); let callsite_hash_1 = 777; - let ptr_1 = 0x1 as *mut u8; - let response = record_allocation(callsite_hash_1, 1500, ptr_1); + let ptr_1 = as_ptr(1); + let response = allocations.record_allocation(callsite_hash_1, 1500, ptr_1); assert!(matches!( response, AllocRecordingResponse::ThresholdNotExceeded )); - let ptr_2 = 0x2 as *mut u8; - let response = record_allocation(callsite_hash_1, 1500, ptr_2); + let ptr_2 = as_ptr(2); + let response = allocations.record_allocation(callsite_hash_1, 1500, ptr_2); let AllocRecordingResponse::ThresholdExceeded(statistic) = response else { panic!("Expected ThresholdExceeded response"); }; @@ -284,82 +293,143 @@ mod tests { assert_eq!(statistic.size, ByteSize(3000)); assert_eq!(statistic.last_report, ByteSize(0)); - record_reallocation(2000, ptr_1, ptr_1); + // alloc grows a little bit + let response = allocations.record_reallocation(2000, ptr_1, ptr_1); assert!(matches!( response, - AllocRecordingResponse::ThresholdNotExceeded + ReallocRecordingResponse::ThresholdNotExceeded )); - record_reallocation(3000, ptr_1, ptr_1); + // alloc grows a lot + let response = allocations.record_reallocation(4000, ptr_1, ptr_1); + let ReallocRecordingResponse::ThresholdExceeded { + statistics, + callsite_hash, + } = response + else { + panic!("Expected ThresholdExceeded response"); + }; + assert_eq!(statistics.count, 2); + assert_eq!(statistics.size, ByteSize(5500)); + assert_eq!(statistics.last_report, ByteSize(3000)); + assert_eq!(callsite_hash, callsite_hash_1); + + // alloc grows a little bit and moves + let ptr_3 = as_ptr(3); + let response = allocations.record_reallocation(4500, ptr_1, ptr_3); assert!(matches!( response, - AllocRecordingResponse::ThresholdNotExceeded + ReallocRecordingResponse::ThresholdNotExceeded )); - let ptr_3 = 0x3 as *mut u8; - record_reallocation(1500, ptr_1, ptr_3); - assert!(matches!( - response, - AllocRecordingResponse::ThresholdNotExceeded - )); + // alloc grows a lot and moves + let ptr_4 = as_ptr(4); + let response = allocations.record_reallocation(6000, ptr_3, ptr_4); + let ReallocRecordingResponse::ThresholdExceeded { + statistics, + callsite_hash, + } = response + else { + panic!("Expected ThresholdExceeded response"); + }; + assert_eq!(statistics.count, 2); + assert_eq!(statistics.size, ByteSize(7500)); + assert_eq!(statistics.last_report, ByteSize(5500)); + assert_eq!(callsite_hash, callsite_hash_1); // once an existing allocation moved, it's previous location can be re-allocated - let response = record_allocation(callsite_hash_1, 1500, ptr_1); + let response = allocations.record_allocation(callsite_hash_1, 2000, ptr_1); + let AllocRecordingResponse::ThresholdExceeded(statistics) = response else { + panic!("Expected ThresholdExceeded response"); + }; + assert_eq!(statistics.count, 3); + assert_eq!(statistics.size, ByteSize(9500)); + assert_eq!(statistics.last_report, ByteSize(7500)); + assert_eq!(callsite_hash, callsite_hash_1); + + // reallocation is ignored on unknown allocation + let ptr_404 = as_ptr(404); + let response = allocations.record_reallocation(10000, ptr_404, ptr_404); assert!(matches!( response, - AllocRecordingResponse::ThresholdNotExceeded + ReallocRecordingResponse::ThresholdNotExceeded )); } #[test] - #[serial_test::file_serial] fn test_tracker_full() { - init(1024 * 1024 * 1024); - let memory_locations_capacity = ALLOCATION_TRACKER - .lock() - .unwrap() + let mut allocations = Allocations::default(); + allocations.init(1024 * 1024 * 1024); + let max_tracked_locations = allocations.max_tracked_memory_locations; + + // Track a first allocation. This one is not removed thoughout this test. + let first_location_ptr = as_ptr(1); + let response = allocations.record_allocation(777, 10, first_location_ptr); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + let ref_addr = allocations .memory_locations - .capacity(); + .get(&(first_location_ptr as usize)) + .unwrap() as *const Allocation; + // Assert that no hashmap resize occurs by tracking the address + // stability of the first value. Using HashMap::capacity() proved not to + // be reliable (unclear spec). + let assert_locations_map_didnt_move = |allocations: &Allocations, loc: &str| { + assert_eq!( + allocations + .memory_locations + .get(&(first_location_ptr as usize)) + .unwrap() as *const Allocation, + ref_addr, + "{loc}", + ); + }; - for i in 0..memory_locations_capacity { - let ptr = (i + 1) as *mut u8; - let response = record_allocation(777, 10, ptr); + // fill the table + let moving_ptr_range = (first_location_ptr as usize + 1) + ..(first_location_ptr as usize + max_tracked_locations); + for i in moving_ptr_range.clone() { + let ptr = as_ptr(i); + let response = allocations.record_allocation(777, 10, ptr); assert!(matches!( response, AllocRecordingResponse::ThresholdNotExceeded )); + assert_locations_map_didnt_move(&allocations, "fill"); } + assert_eq!(allocations.memory_locations.len(), max_tracked_locations); - // the map is full - - let response = record_allocation(777, 10, (memory_locations_capacity + 1) as *mut u8); + // the table is full, no more allocation is tracked + let response = allocations.record_allocation(777, 10, as_ptr(moving_ptr_range.end)); assert!(matches!( response, AllocRecordingResponse::TrackerFull("memory_locations") )); - // make sure that the map didn't grow - let current_memory_locations_capacity = ALLOCATION_TRACKER - .lock() - .unwrap() - .memory_locations - .capacity(); - assert_eq!(current_memory_locations_capacity, memory_locations_capacity); + assert_locations_map_didnt_move(&allocations, "full"); + + // run a heavy insert/remove workload + let last_location = 10 * max_tracked_locations; + for i in moving_ptr_range.end..=last_location { + let removed_ptr = as_ptr(i - 1); + allocations.record_deallocation(removed_ptr); + let inserted_ptr = as_ptr(i); + let response = allocations.record_allocation(888, 10, inserted_ptr); + assert!(matches!( + response, + AllocRecordingResponse::ThresholdNotExceeded + )); + assert_locations_map_didnt_move(&allocations, "reinsert"); + } - let response = record_reallocation( - 10, - std::ptr::null_mut::(), - (memory_locations_capacity + 1) as *mut u8, - ); + // reallocations are fine because they don't create an entry in the map + let response = + allocations.record_reallocation(10, as_ptr(last_location), as_ptr(last_location + 1)); assert!(matches!( response, ReallocRecordingResponse::ThresholdNotExceeded, )); - // make sure that the map didn't grow - let current_memory_locations_capacity = ALLOCATION_TRACKER - .lock() - .unwrap() - .memory_locations - .capacity(); - assert_eq!(current_memory_locations_capacity, memory_locations_capacity); + assert_locations_map_didnt_move(&allocations, "realloc"); } } diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 11abf9dd5a7..6598d38b35b 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -14,13 +14,17 @@ use std::alloc::{GlobalAlloc, Layout}; use std::hash::Hasher; +use std::sync::Mutex; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use bytesize::ByteSize; +use once_cell::sync::Lazy; use tikv_jemallocator::Jemalloc; use tracing::{error, info, trace}; -use crate::alloc_tracker::{self, AllocRecordingResponse, ReallocRecordingResponse}; +use crate::alloc_tracker::{ + AllocRecordingResponse, AllocStat, Allocations, ReallocRecordingResponse, +}; const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 64 * 1024; const DEFAULT_REPORTING_INTERVAL_BYTES: u64 = 1024 * 1024 * 1024; @@ -48,6 +52,9 @@ static FLAGS: Flags = Flags { _padding: [0; 119], }; +static ALLOCATION_TRACKER: Lazy> = + Lazy::new(|| Mutex::new(Allocations::default())); + /// Starts measuring heap allocations and logs important leaks. /// /// This function uses a wrapper around the global Jemalloc allocator to @@ -84,7 +91,10 @@ pub fn start_profiling( let alloc_bytes_triggering_backtrace = alloc_bytes_triggering_backtrace.unwrap_or(DEFAULT_REPORTING_INTERVAL_BYTES); - alloc_tracker::init(alloc_bytes_triggering_backtrace); + ALLOCATION_TRACKER + .lock() + .unwrap() + .init(alloc_bytes_triggering_backtrace); let min_alloc_bytes_for_profiling = min_alloc_bytes_for_profiling.unwrap_or(DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING); @@ -167,7 +177,7 @@ unsafe impl GlobalAlloc for JemallocProfiled { /// Prints both a backtrace and a Tokio tracing log /// /// Warning: stdout might allocate a buffer on first use -fn identify_callsite(callsite_hash: u64, stat: alloc_tracker::Statistic) { +fn identify_callsite(callsite_hash: u64, stat: AllocStat) { // to generate a complete trace: // - tokio/tracing feature must be enabled, otherwise un-instrumented tasks will not propagate // spans @@ -189,8 +199,11 @@ fn backtrace_hash() -> u64 { fn track_alloc_call(ptr: *mut u8, layout: Layout) { if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { let callsite_hash = backtrace_hash(); - let recording_response = - alloc_tracker::record_allocation(callsite_hash, layout.size() as u64, ptr); + let recording_response = ALLOCATION_TRACKER.lock().unwrap().record_allocation( + callsite_hash, + layout.size() as u64, + ptr, + ); match recording_response { AllocRecordingResponse::ThresholdExceeded(stat_for_trace) => { @@ -212,7 +225,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { #[cold] fn track_dealloc_call(ptr: *mut u8, layout: Layout) { if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { - alloc_tracker::record_deallocation(ptr); + ALLOCATION_TRACKER.lock().unwrap().record_deallocation(ptr); } } @@ -226,8 +239,11 @@ fn track_realloc_call( ) { if current_layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { - let recording_response = - alloc_tracker::record_reallocation(new_size as u64, old_ptr, new_pointer); + let recording_response = ALLOCATION_TRACKER.lock().unwrap().record_reallocation( + new_size as u64, + old_ptr, + new_pointer, + ); match recording_response { ReallocRecordingResponse::ThresholdExceeded { From 861bd3e34bb1c5c93ab86fe3c6ba063bea344fc4 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 10/13] Add docker ci --- .github/workflows/publish_docker_images.yml | 6 ++++++ quickwit/quickwit-cli/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/publish_docker_images.yml b/.github/workflows/publish_docker_images.yml index ffd8c9b085b..6c5d583f302 100644 --- a/.github/workflows/publish_docker_images.yml +++ b/.github/workflows/publish_docker_images.yml @@ -62,6 +62,11 @@ jobs: echo "QW_COMMIT_DATE=$(TZ=UTC0 git log -1 --format=%cd --date=format-local:%Y-%m-%dT%H:%M:%SZ)" >> $GITHUB_ENV echo "QW_COMMIT_HASH=$(git rev-parse HEAD)" >> $GITHUB_ENV echo "QW_COMMIT_TAGS=$(git tag --points-at HEAD | tr '\n' ',')" >> $GITHUB_ENV + if [[ "${{ github.event_name }}" == "push" && "${{ github.ref_type }}" == "tag" && "${GITHUB_REF#refs/tags/}" == *"jemprof"* ]]; then + echo "CARGO_FEATURES=release-jemalloc-profiled" >> $GITHUB_ENV + else + echo "CARGO_FEATURES=release-feature-set" >> $GITHUB_ENV + fi - name: Build and push image uses: docker/build-push-action@v6 @@ -73,6 +78,7 @@ jobs: QW_COMMIT_DATE=${{ env.QW_COMMIT_DATE }} QW_COMMIT_HASH=${{ env.QW_COMMIT_HASH }} QW_COMMIT_TAGS=${{ env.QW_COMMIT_TAGS }} + CARGO_FEATURES=${{ env.CARGO_FEATURES }} labels: ${{ steps.meta.outputs.labels }} outputs: type=image,name=${{ env.REGISTRY_IMAGE }},push-by-digest=true,name-canonical=true,push=true diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 69e634de611..2ddbd21baae 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -133,7 +133,7 @@ release-macos-feature-vendored-set = [ "quickwit-metastore/postgres", "quickwit-doc-mapper/multilang", ] -release-heap-profiled = [ +release-jemalloc-profiled = [ "release-feature-set", "jemalloc-profiled", ] From 3e67ffc674c39e7765367189e3af7e350e1fa724 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 11/13] Refactor logger for readability --- quickwit/Cargo.toml | 1 - quickwit/quickwit-cli/src/logger.rs | 84 +++++++++++++------ .../quickwit-common/src/jemalloc_profiled.rs | 7 +- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index ba77ae5e507..460dfb06756 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -118,7 +118,6 @@ google-cloud-auth = "0.12.0" google-cloud-gax = "0.15.0" google-cloud-googleapis = { version = "0.10.0", features = ["pubsub"] } google-cloud-pubsub = "0.18.0" -hashbrown = "0.15" heck = "0.4.1" hex = "0.4.3" home = "0.5.4" diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 9a9a3270595..5a4421e31db 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -40,6 +40,8 @@ use crate::QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY; #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; +/// Load the default logging filter from the environment. The filter can later +/// be updated using the result callback of [setup_logging_and_tracing]. fn startup_env_filter(level: Level) -> anyhow::Result { let env_filter = env::var("RUST_LOG") .map(|_| EnvFilter::from_default_env()) @@ -115,30 +117,14 @@ pub fn setup_logging_and_tracing( .fmt_fields(fmt_fields) .with_ansi(ansi_colors), ); - // The the jemalloc profiler formatter disables the env filter reloading - // because the layer seemed to overwrite the TRACE level span filter. #[cfg(feature = "jemalloc-profiled")] - let registry = registry - .with( - tracing_subscriber::fmt::layer() - .event_format(jemalloc_profiled::ProfilingFormat::default()) - .fmt_fields(DefaultFields::new()) - .with_ansi(ansi_colors) - .with_filter(tracing_subscriber::filter::filter_fn(|metadata| { - metadata.is_span() - || (metadata.is_event() - && metadata.level() == &Level::TRACE - && metadata.target() - == quickwit_common::jemalloc_profiled::JEMALLOC_PROFILER_TARGET) - })), - ) - .with( - tracing_subscriber::fmt::layer() - .event_format(event_format) - .fmt_fields(fmt_fields) - .with_ansi(ansi_colors) - .with_filter(startup_env_filter(level)?), - ); + let registry = jemalloc_profiled::configure_registry( + registry, + event_format, + fmt_fields, + ansi_colors, + level, + )?; registry .try_init() @@ -226,25 +212,29 @@ impl FormatFields<'_> for FieldFormat { } } +/// Logger configurations specific to the jemalloc profiler. #[cfg(feature = "jemalloc-profiled")] pub(super) mod jemalloc_profiled { use std::fmt; use quickwit_common::jemalloc_profiled::JEMALLOC_PROFILER_TARGET; use time::format_description::BorrowedFormatItem; - use tracing::{Event, Subscriber}; - use tracing_subscriber::fmt::format::Writer; + use tracing::{Event, Level, Metadata, Subscriber}; + use tracing_subscriber::Layer; + use tracing_subscriber::filter::filter_fn; + use tracing_subscriber::fmt::format::{DefaultFields, Writer}; use tracing_subscriber::fmt::time::{FormatTime, UtcTime}; use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields}; + use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::LookupSpan; - use super::time_formatter; + use super::{EventFormat, FieldFormat, startup_env_filter, time_formatter}; /// An event formatter specific to the memory profiler output. /// /// Besides printing the spans and the fields of the tracing event, it also /// displays a backtrace. - pub struct ProfilingFormat { + struct ProfilingFormat { time_formatter: UtcTime>>, } @@ -306,4 +296,44 @@ pub(super) mod jemalloc_profiled { Ok(()) } } + + fn profiler_tracing_filter(metadata: &Metadata) -> bool { + metadata.is_span() + || (metadata.is_event() + && metadata.level() == &Level::TRACE + && metadata.target() == JEMALLOC_PROFILER_TARGET) + } + + /// Configures the regular logging layer and a specific layer that gathers + /// extra debug information for the jemalloc profiler. + /// + /// The the jemalloc profiler formatter disables the env filter reloading + /// because the [tracing_subscriber::reload::Layer] seems to overwrite the + /// TRACE level span filter even though it's applied to a separate layer. + pub(super) fn configure_registry( + registry: S, + event_format: EventFormat<'static>, + fmt_fields: FieldFormat, + ansi_colors: bool, + level: Level, + ) -> anyhow::Result LookupSpan<'span>> + where + S: Subscriber + for<'span> LookupSpan<'span>, + { + Ok(registry + .with( + tracing_subscriber::fmt::layer() + .event_format(ProfilingFormat::default()) + .fmt_fields(DefaultFields::new()) + .with_ansi(ansi_colors) + .with_filter(filter_fn(profiler_tracing_filter)), + ) + .with( + tracing_subscriber::fmt::layer() + .event_format(event_format) + .fmt_fields(fmt_fields) + .with_ansi(ansi_colors) + .with_filter(startup_env_filter(level)?), + )) + } } diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 6598d38b35b..15ddfda07ec 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -178,10 +178,11 @@ unsafe impl GlobalAlloc for JemallocProfiled { /// /// Warning: stdout might allocate a buffer on first use fn identify_callsite(callsite_hash: u64, stat: AllocStat) { - // to generate a complete trace: + // To generate a complete trace: // - tokio/tracing feature must be enabled, otherwise un-instrumented tasks will not propagate - // spans - // - the tracing fmt subscriber filter must keep all spans for this event (at least debug) + // parent spans + // - the tracing fmt subscriber filter must keep all spans for this event (TRACE level) + // See the logger configuration for more details. trace!(target: JEMALLOC_PROFILER_TARGET, callsite=callsite_hash, allocs=stat.count, size=%stat.size); } From 090d6ba3af645dff54a1199429cac36913caedaa Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:22 +0200 Subject: [PATCH 12/13] Fix typos --- quickwit/quickwit-cli/src/logger.rs | 12 +++--- quickwit/quickwit-common/src/alloc_tracker.rs | 12 +++--- .../quickwit-common/src/jemalloc_profiled.rs | 37 +++++++++---------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 5a4421e31db..7399fc0b197 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -213,6 +213,9 @@ impl FormatFields<'_> for FieldFormat { } /// Logger configurations specific to the jemalloc profiler. +/// +/// A custom event formatter is used to print the backtrace of the +/// profiling events. #[cfg(feature = "jemalloc-profiled")] pub(super) mod jemalloc_profiled { use std::fmt; @@ -232,8 +235,8 @@ pub(super) mod jemalloc_profiled { /// An event formatter specific to the memory profiler output. /// - /// Besides printing the spans and the fields of the tracing event, it also - /// displays a backtrace. + /// Also displays a backtrace after spans and the fields of the tracing + /// event (into separate lines). struct ProfilingFormat { time_formatter: UtcTime>>, } @@ -298,10 +301,7 @@ pub(super) mod jemalloc_profiled { } fn profiler_tracing_filter(metadata: &Metadata) -> bool { - metadata.is_span() - || (metadata.is_event() - && metadata.level() == &Level::TRACE - && metadata.target() == JEMALLOC_PROFILER_TARGET) + metadata.is_span() || (metadata.is_event() && metadata.target() == JEMALLOC_PROFILER_TARGET) } /// Configures the regular logging layer and a specific layer that gathers diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs index 0c9459fc9fa..8f5d76327d4 100644 --- a/quickwit/quickwit-common/src/alloc_tracker.rs +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -90,11 +90,11 @@ impl Allocations { } } - /// Records an allocation and occasionally reports the cumulated allocation size - /// for the provided callsite_hash. + /// Records an allocation and occasionally reports the cumulated allocation + /// size for the provided callsite_hash. /// - /// Every time a the total allocated size with the same callsite_hash - /// exceeds the previous reported value by at least reporting_interval, that + /// Every time the total allocated size for a given callsite_hash exceeds + /// the previous reported value by at least reporting_interval, the new total /// allocated size is reported. /// /// WARN: this function should not allocate! @@ -111,7 +111,7 @@ impl Allocations { return AllocRecordingResponse::TrackerFull("memory_locations"); } if self.max_tracked_callsites == self.callsite_statistics.len() { - return AllocRecordingResponse::TrackerFull("memory_locations"); + return AllocRecordingResponse::TrackerFull("tracked_callsites"); } self.memory_locations.insert( ptr as usize, @@ -144,6 +144,8 @@ impl Allocations { /// Updates the the memory location and size of an existing allocation. Only /// update the statistics if the original allocation was recorded. + /// + /// WARN: this function should not allocate! pub fn record_reallocation( &mut self, new_size_bytes: u64, diff --git a/quickwit/quickwit-common/src/jemalloc_profiled.rs b/quickwit/quickwit-common/src/jemalloc_profiled.rs index 15ddfda07ec..14fe303f3a1 100644 --- a/quickwit/quickwit-common/src/jemalloc_profiled.rs +++ b/quickwit/quickwit-common/src/jemalloc_profiled.rs @@ -29,10 +29,12 @@ use crate::alloc_tracker::{ const DEFAULT_MIN_ALLOC_BYTES_FOR_PROFILING: u64 = 64 * 1024; const DEFAULT_REPORTING_INTERVAL_BYTES: u64 = 1024 * 1024 * 1024; +/// This custom target name is used to filter profiling events in the tracing +/// subscriber. It is also included in the printed log. pub const JEMALLOC_PROFILER_TARGET: &str = "jemprof"; /// Atomics are used to communicate configurations between the start/stop -/// endpoints and the JemallocProfiled allocator wrapper. +/// endpoints and the [JemallocProfiled] allocator wrapper. /// /// The flags are padded to avoid false sharing of the CPU cache line between /// threads. 128 bytes is the cache line size on x86_64 and arm64. @@ -130,11 +132,13 @@ pub fn stop_profiling() { /// Wraps the Jemalloc global allocator calls with tracking routines. /// -/// The tracking routines are called only when [ENABLED] is set to true (calling -/// [start_profiling()]), but we don't enforce any synchronization (we load it with -/// Ordering::Relaxed) because it's fine to miss or record extra allocation events. +/// The tracking routines are called only when FLAGS.enabled is set to true +/// (calling [start_profiling()]). We load it with [Ordering::Relaxed] because +/// it's fine to miss or record extra allocation events and prefer limiting the +/// performance impact when profiling is not enabled. /// -/// It's important to ensure that no allocations are performed inside the allocator! +/// Note: It's important to ensure that no allocations are performed inside the +/// allocator! It can cause an abort, a panic or even a deadlock. pub struct JemallocProfiled(pub Jemalloc); unsafe impl GlobalAlloc for JemallocProfiled { @@ -176,13 +180,13 @@ unsafe impl GlobalAlloc for JemallocProfiled { /// Prints both a backtrace and a Tokio tracing log /// -/// Warning: stdout might allocate a buffer on first use +/// Warning: stdout writer might allocate a buffer on first use fn identify_callsite(callsite_hash: u64, stat: AllocStat) { // To generate a complete trace: // - tokio/tracing feature must be enabled, otherwise un-instrumented tasks will not propagate // parent spans - // - the tracing fmt subscriber filter must keep all spans for this event (TRACE level) - // See the logger configuration for more details. + // - the tracing fmt subscriber filter must keep all spans for this event (TRACE level). See the + // logger configuration for more details. trace!(target: JEMALLOC_PROFILER_TARGET, callsite=callsite_hash, allocs=stat.count, size=%stat.size); } @@ -195,7 +199,7 @@ fn backtrace_hash() -> u64 { hasher.finish() } -/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). +/// Warning: this function should not allocate! #[cold] fn track_alloc_call(ptr: *mut u8, layout: Layout) { if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { @@ -212,7 +216,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { } AllocRecordingResponse::TrackerFull(table_name) => { // this message might be displayed multiple times but that's fine - // warning: stdout might allocate a buffer on first use + // warning: stdout writer might allocate a buffer on first use error!("heap profiling stopped, {table_name} full"); FLAGS.enabled.store(false, Ordering::Relaxed); } @@ -222,7 +226,7 @@ fn track_alloc_call(ptr: *mut u8, layout: Layout) { } } -/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). +/// Warning: this function should not allocate! #[cold] fn track_dealloc_call(ptr: *mut u8, layout: Layout) { if layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { @@ -230,20 +234,15 @@ fn track_dealloc_call(ptr: *mut u8, layout: Layout) { } } -/// Warning: allocating inside this function can cause an error (abort, panic or even deadlock). +/// Warning: this function should not allocate! #[cold] -fn track_realloc_call( - old_ptr: *mut u8, - new_pointer: *mut u8, - current_layout: Layout, - new_size: usize, -) { +fn track_realloc_call(old_ptr: *mut u8, new_ptr: *mut u8, current_layout: Layout, new_size: usize) { if current_layout.size() >= FLAGS.min_alloc_bytes_for_profiling.load(Ordering::Relaxed) as usize { let recording_response = ALLOCATION_TRACKER.lock().unwrap().record_reallocation( new_size as u64, old_ptr, - new_pointer, + new_ptr, ); match recording_response { From 14593048fd7e06550b55ee79ac8662aa9fbdd771 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 14 May 2025 16:07:23 +0200 Subject: [PATCH 13/13] Fix typo --- quickwit/quickwit-common/src/alloc_tracker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-common/src/alloc_tracker.rs b/quickwit/quickwit-common/src/alloc_tracker.rs index 8f5d76327d4..4db8dea6fa4 100644 --- a/quickwit/quickwit-common/src/alloc_tracker.rs +++ b/quickwit/quickwit-common/src/alloc_tracker.rs @@ -142,7 +142,7 @@ impl Allocations { } } - /// Updates the the memory location and size of an existing allocation. Only + /// Updates the memory location and size of an existing allocation. Only /// update the statistics if the original allocation was recorded. /// /// WARN: this function should not allocate!