diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ff117ca8..9ab3fe6b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -32,7 +32,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: clippy - args: --all-features -- -D warnings + args: --features=protobuf,flamegraph -- -D warnings test: name: Test @@ -56,4 +56,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: --all-features + args: --features=protobuf,flamegraph diff --git a/Cargo.toml b/Cargo.toml index b583473d..d4c521d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ default = ["cpp"] flamegraph = ["inferno"] protobuf = ["prost", "prost-derive", "prost-build"] cpp = ["symbolic-demangle/cpp"] +heap = [] [dependencies] backtrace = "0.3" @@ -21,9 +22,9 @@ lazy_static = "1.4" libc = "^0.2.66" log = "0.4" nix = "0.19" -parking_lot = "0.11" tempfile = "3.1" thiserror = "1.0" +spin = "0.7" inferno = { version = "0.10", default-features = false, features = ["nameattr"], optional = true } prost = { version = "0.6", optional = true } @@ -53,6 +54,10 @@ required-features = ["protobuf"] name = "multithread_flamegraph" required-features = ["flamegraph"] +[[example]] +name = "heap_profiler" +required-features = ["protobuf", "flamegraph", "heap"] + [[bench]] name = "collector" path = "benches/collector.rs" diff --git a/examples/heap_profiler.rs b/examples/heap_profiler.rs new file mode 100644 index 00000000..35ae2cb1 --- /dev/null +++ b/examples/heap_profiler.rs @@ -0,0 +1,43 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use pprof::protos::Message; +use pprof::AllocRecorder; +use std::alloc::System; +use std::fs::File; +use std::io::Write; + +#[global_allocator] +static ALLOC: AllocRecorder = AllocRecorder::new(System); + +fn main() { + let guard = ALLOC.profile().unwrap(); + + memory_leak(65536); + + match guard.report().build() { + Ok(report) => { + let mut file = File::create("profile.pb").unwrap(); + let profile = report.pprof().unwrap(); + + let mut content = Vec::new(); + profile.encode(&mut content).unwrap(); + file.write_all(&content).unwrap(); + + let file = File::create("flamegraph.svg").unwrap(); + report.flamegraph(file).unwrap(); + + println!("{:?}", report); + } + Err(_) => {} + }; +} + +fn memory_leak(size: usize) { + let b = Box::new(vec![0; size]); + Box::leak(b); + + if size > 0 { + memory_leak(size / 2); + memory_leak(size / 2); + } +} diff --git a/src/collector.rs b/src/collector.rs index c77a7eae..ee56477c 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -342,6 +342,40 @@ mod tests { } } + #[test] + fn collector_minus_test() { + let mut collector = Collector::new().unwrap(); + let mut real_map = BTreeMap::new(); + + for item in 0..(1 << 12) * 4 { + for _ in 0..(item % 4) { + collector.add(item, 2).unwrap(); + } + } + + for item in 0..(1 << 12) * 4 { + for _ in 0..(item % 4) { + collector.add(item, -1).unwrap(); + } + } + + collector.try_iter().unwrap().for_each(|entry| { + add_map(&mut real_map, &entry); + }); + + for item in 0..(1 << 12) * 4 { + let count = (item % 4) as isize; + match real_map.get(&item) { + Some(value) => { + assert_eq!(count, *value); + } + None => { + assert_eq!(count, 0); + } + } + } + } + extern "C" { static mut __malloc_hook: Option *mut c_void>; diff --git a/src/heap_profiler.rs b/src/heap_profiler.rs new file mode 100644 index 00000000..3d0ffcbb --- /dev/null +++ b/src/heap_profiler.rs @@ -0,0 +1,162 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +use std::alloc::{GlobalAlloc, Layout}; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; + +use backtrace::Frame; +use spin::RwLock; + +use crate::profiler::Profiler; +use crate::Error; +use crate::ReportBuilder; +use crate::Result; +use crate::MAX_DEPTH; + +lazy_static::lazy_static! { + pub(crate) static ref HEAP_PROFILER: RwLock> = RwLock::new(Profiler::new()); +} + +pub struct AllocRecorder { + inner: T, + profiling: AtomicBool, +} + +impl AllocRecorder { + pub const fn new(inner: T) -> AllocRecorder { + AllocRecorder { + inner, + profiling: AtomicBool::new(false), + } + } + + pub fn profile(&self) -> Result> { + match HEAP_PROFILER.write().as_mut() { + Err(err) => { + log::error!("Error in creating profiler: {}", err); + Err(Error::CreatingError) + } + Ok(profiler) => match profiler.start() { + Ok(()) => { + self.start(); + + Ok(HeapProfilerGuard::<'static, '_, T> { + profiler: &HEAP_PROFILER, + alloc: self, + }) + } + Err(err) => Err(err), + }, + } + } + + pub(crate) fn start(&self) { + self.profiling.store(true, Ordering::SeqCst) + } + + pub(crate) fn stop(&self) { + self.profiling.store(false, Ordering::SeqCst) + } +} + +pub struct HeapReportBuilder<'a, 'b, 'c, T: GlobalAlloc> { + report_builder: ReportBuilder<'a>, + guard: &'a HeapProfilerGuard<'b, 'c, T>, +} + +impl Drop for HeapReportBuilder<'_, '_, '_, T> { + fn drop(&mut self) { + self.guard.alloc.start() + } +} + +impl<'a, T: GlobalAlloc> Deref for HeapReportBuilder<'a, '_, '_, T> { + type Target = ReportBuilder<'a>; + + fn deref(&self) -> &Self::Target { + &self.report_builder + } +} + +pub struct HeapProfilerGuard<'a, 'b, T: GlobalAlloc> { + profiler: &'a RwLock>, + alloc: &'b AllocRecorder, +} + +impl HeapProfilerGuard<'_, '_, T> { + /// Generate a report + pub fn report(&self) -> HeapReportBuilder<'_, '_, '_, T> { + self.alloc.stop(); + + HeapReportBuilder { + report_builder: ReportBuilder::new(&self.profiler), + guard: &self, + } + } +} + +impl Drop for HeapProfilerGuard<'_, '_, T> { + fn drop(&mut self) { + self.alloc.stop(); + + match self.profiler.write().as_mut() { + Err(_) => {} + Ok(profiler) => match profiler.init() { + Ok(()) => {} + Err(err) => log::error!("error while reinitializing profiler {}", err), + }, + } + } +} + +unsafe impl GlobalAlloc for AllocRecorder { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + if self.profiling.load(Ordering::SeqCst) { + let mut guard = HEAP_PROFILER.write(); + if let Ok(profiler) = guard.as_mut() { + let mut bt: [Frame; MAX_DEPTH] = std::mem::MaybeUninit::uninit().assume_init(); + let mut index = 0; + + backtrace::trace_unsynchronized(|frame| { + if index < MAX_DEPTH { + bt[index] = frame.clone(); + index += 1; + true + } else { + false + } + }); + + let size = (layout.size() + layout.align()) as isize; + profiler.sample(&bt[0..index], &[], 0, size); + } + } + + self.inner.alloc(layout) + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + if self.profiling.load(Ordering::SeqCst) { + let mut guard = HEAP_PROFILER.write(); + if let Ok(profiler) = guard.as_mut() { + let mut bt: [Frame; MAX_DEPTH] = std::mem::MaybeUninit::uninit().assume_init(); + let mut index = 0; + + backtrace::trace_unsynchronized(|frame| { + if index < MAX_DEPTH { + bt[index] = frame.clone(); + index += 1; + true + } else { + false + } + }); + + let size = (layout.size() + layout.align()) as isize; + profiler.sample(&bt[0..index], &[], 0, -size); + } + } + + self.inner.dealloc(ptr, layout); + } +} diff --git a/src/lib.rs b/src/lib.rs index c0b0f22b..115b0ffe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,8 @@ //!}; //! ``` +#![cfg_attr(feature = "heap", feature(const_fn))] + /// Define the MAX supported stack depth. TODO: make this variable mutable. pub const MAX_DEPTH: usize = 32; @@ -30,6 +32,9 @@ pub const MAX_THREAD_NAME: usize = 16; mod collector; mod error; mod frames; +#[cfg(feature = "heap")] +mod heap_profiler; + mod profiler; mod report; mod timer; @@ -37,6 +42,10 @@ mod timer; pub use self::collector::{Collector, StackHashCounter}; pub use self::error::{Error, Result}; pub use self::frames::{Frames, Symbol}; + +#[cfg(feature = "heap")] +pub use self::heap_profiler::{AllocRecorder, HeapProfilerGuard}; + pub use self::profiler::ProfilerGuard; pub use self::report::{Report, ReportBuilder}; diff --git a/src/profiler.rs b/src/profiler.rs index 2a93d82b..da8c00e4 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -5,7 +5,7 @@ use std::os::raw::c_int; use backtrace::Frame; use nix::sys::signal; -use parking_lot::RwLock; +use spin::RwLock; use crate::collector::Collector; use crate::error::{Error, Result}; @@ -143,13 +143,13 @@ extern "C" fn perf_signal_handler(_signal: c_int) { write_thread_name(current_thread, &mut name); let name = unsafe { std::ffi::CStr::from_ptr(name_ptr) }; - profiler.sample(&bt[0..index], name.to_bytes(), current_thread as u64); + profiler.sample(&bt[0..index], name.to_bytes(), current_thread as u64, 1); } } } impl Profiler { - fn new() -> Result { + pub fn new() -> Result { Ok(Profiler { data: Collector::new()?, sample_counter: 0, @@ -171,7 +171,7 @@ impl Profiler { } } - fn init(&mut self) -> Result<()> { + pub(crate) fn init(&mut self) -> Result<()> { self.sample_counter = 0; self.data = Collector::new()?; self.running = false; @@ -206,11 +206,17 @@ impl Profiler { } // This function has to be AS-safe - pub fn sample(&mut self, backtrace: &[Frame], thread_name: &[u8], thread_id: u64) { + pub fn sample( + &mut self, + backtrace: &[Frame], + thread_name: &[u8], + thread_id: u64, + count: isize, + ) { let frames = UnresolvedFrames::new(backtrace, thread_name, thread_id); self.sample_counter += 1; - if let Ok(()) = self.data.add(frames, 1) {} + if let Ok(()) = self.data.add(frames, count) {} } } diff --git a/src/report.rs b/src/report.rs index 91082f4f..430a94ed 100644 --- a/src/report.rs +++ b/src/report.rs @@ -3,11 +3,10 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; -use parking_lot::RwLock; +use spin::RwLock; use crate::frames::{Frames, UnresolvedFrames}; use crate::profiler::Profiler; - use crate::{Error, Result}; /// The final presentation of a report which is actually an `HashMap` from `Frames` to isize (count).