Skip to content

feat(base): enhanced query-level memory management #17358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
eb1e9f4
chore(base): test memory alloc bucket
zhang2014 Jan 23, 2025
f6f8c42
chore(base): embed tracker into large memory
zhang2014 Jan 23, 2025
7661aec
chore(base): embed tracker into large memory.
zhang2014 Feb 6, 2025
a9e7f61
chore(base): embed tracker into large memory.
zhang2014 Feb 6, 2025
6eca024
chore(base): embed tracker into large memory.
zhang2014 Feb 6, 2025
df65f83
chore(base): embed tracker into large memory.
zhang2014 Feb 7, 2025
d859c6e
chore(base): embed tracker into large memory.
zhang2014 Feb 8, 2025
95946ae
chore(base): embed tracker into large memory.
zhang2014 Feb 8, 2025
1c553ef
chore(base): embed tracker into large memory.
zhang2014 Feb 8, 2025
30051bf
chore(base): embed tracker into large memory.
zhang2014 Feb 8, 2025
9f39538
chore(base): embed tracker into large memory.
zhang2014 Feb 8, 2025
477ac52
chore(base): embed tracker into large memory.
zhang2014 Feb 14, 2025
cf6a6c2
chore(base): embed tracker into large memory.
zhang2014 Feb 14, 2025
6106179
chore(base): embed tracker into large memory.
zhang2014 Feb 14, 2025
e73aed6
chore(base): embed tracker into large memory.
zhang2014 Feb 14, 2025
872053c
chore(base): embed tracker into large memory.
zhang2014 Feb 14, 2025
ba753a3
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 14, 2025
5b3faf1
chore(base): embed tracker into large memory.
zhang2014 Feb 15, 2025
bfb5911
chore(base): embed tracker into large memory.
zhang2014 Feb 15, 2025
55e4326
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 17, 2025
ba22947
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 17, 2025
fe678ad
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 20, 2025
4bec968
chore(base): embed tracker into large memory.
zhang2014 Feb 23, 2025
f663325
Merge branch 'chore/test_alloc_bucket' of github.com:zhang2014/datafu…
zhang2014 Feb 23, 2025
4e6057c
chore(base): embed tracker into large memory.
zhang2014 Feb 23, 2025
7e9f381
chore(base): embed tracker into large memory.
zhang2014 Feb 23, 2025
7f7bb3b
chore(base): embed tracker into large memory.
zhang2014 Feb 23, 2025
814d7ca
chore(base): embed tracker into large memory.
zhang2014 Feb 23, 2025
d920b31
chore(base): embed tracker into large memory.
zhang2014 Feb 23, 2025
a088fe2
chore(base): embed tracker into large memory.
zhang2014 Feb 24, 2025
c4b7596
chore(base): embed tracker into large memory.
zhang2014 Feb 24, 2025
9c8064c
chore(base): embed tracker into large memory.
zhang2014 Feb 24, 2025
dbbef68
chore(base): embed tracker into large memory.
zhang2014 Feb 24, 2025
c2a1b8d
chore(base): embed tracker into large memory.
zhang2014 Feb 24, 2025
26ff093
chore(base): embed tracker into large memory.
zhang2014 Feb 24, 2025
45ec463
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 25, 2025
21363f9
chore(base): embed tracker into large memory.
zhang2014 Feb 25, 2025
36166ec
chore(base): embed tracker into large memory.
zhang2014 Feb 25, 2025
dc9a6cd
chore(base): embed tracker into large memory.
zhang2014 Feb 25, 2025
4e252c8
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 26, 2025
08885a6
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Feb 28, 2025
540f7ff
chore(base): embed tracker into large memory.
zhang2014 Feb 28, 2025
0ded685
chore(base): embed tracker into large memory.
zhang2014 Feb 28, 2025
1339d5f
Merge branch 'main' into chore/test_alloc_bucket
zhang2014 Mar 1, 2025
e8e3380
chore(base): embed tracker into large memory.
zhang2014 Mar 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/binaries/query/ee_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

mod entry;

use databend_common_base::mem_allocator::GlobalAllocator;
use databend_common_base::mem_allocator::TrackingGlobalAllocator;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::ThreadTracker;
use databend_common_config::InnerConfig;
Expand All @@ -34,7 +34,7 @@ use crate::entry::run_cmd;
use crate::entry::start_services;

#[global_allocator]
pub static GLOBAL_ALLOCATOR: GlobalAllocator = GlobalAllocator;
pub static GLOBAL_ALLOCATOR: TrackingGlobalAllocator = TrackingGlobalAllocator::create();

fn main() {
let binary_version = (*databend_common_config::DATABEND_COMMIT_VERSION).clone();
Expand Down
6 changes: 3 additions & 3 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::env;
use std::time::Duration;

use databend_common_base::mem_allocator::GlobalAllocator;
use databend_common_base::mem_allocator::TrackingGlobalAllocator;
use databend_common_base::runtime::set_alloc_error_hook;
use databend_common_base::runtime::GLOBAL_MEM_STAT;
use databend_common_config::Commands;
Expand Down Expand Up @@ -305,8 +305,8 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
"unlimited".to_string()
}
});
println!(" allocator: {}", GlobalAllocator::name());
println!(" config: {}", GlobalAllocator::conf());
println!(" allocator: {}", TrackingGlobalAllocator::name());
println!(" config: {}", TrackingGlobalAllocator::conf());

println!();
println!("Cluster: {}", {
Expand Down
4 changes: 2 additions & 2 deletions src/binaries/query/oss_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

mod entry;

use databend_common_base::mem_allocator::GlobalAllocator;
use databend_common_base::mem_allocator::TrackingGlobalAllocator;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::ThreadTracker;
use databend_common_config::InnerConfig;
Expand All @@ -35,7 +35,7 @@ use crate::entry::run_cmd;
use crate::entry::start_services;

#[global_allocator]
pub static GLOBAL_ALLOCATOR: GlobalAllocator = GlobalAllocator;
pub static GLOBAL_ALLOCATOR: TrackingGlobalAllocator = TrackingGlobalAllocator::create();

fn main() {
let binary_version = (*databend_common_config::DATABEND_COMMIT_VERSION).clone();
Expand Down
3 changes: 3 additions & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#![feature(variant_count)]
#![feature(ptr_alignment_type)]
#![feature(vec_into_raw_parts)]
#![feature(slice_ptr_get)]
#![feature(alloc_layout_extra)]
#![feature(let_chains)]

pub mod base;
pub mod containers;
Expand Down
86 changes: 53 additions & 33 deletions src/common/base/src/mem_allocator/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,42 @@ use std::alloc::Layout;
use std::ptr::null_mut;
use std::ptr::NonNull;

use crate::mem_allocator::tracker::MetaTrackerAllocator;
use crate::mem_allocator::DefaultAllocator;

pub type DefaultGlobalAllocator = GlobalAllocator<DefaultAllocator>;
pub type TrackingGlobalAllocator = GlobalAllocator<MetaTrackerAllocator<DefaultAllocator>>;

/// Global allocator, default is JeAllocator.

#[derive(Debug, Clone, Copy, Default)]
pub struct GlobalAllocator;
pub struct GlobalAllocator<T> {
inner: T,
}

impl GlobalAllocator<MetaTrackerAllocator<DefaultAllocator>> {
pub const fn create() -> GlobalAllocator<MetaTrackerAllocator<DefaultAllocator>> {
GlobalAllocator {
inner: MetaTrackerAllocator::create(DefaultAllocator::create()),
}
}

pub fn name() -> String {
DefaultAllocator::name()
}

pub fn conf() -> String {
DefaultAllocator::conf()
}
}

impl GlobalAllocator<DefaultAllocator> {
pub const fn create() -> GlobalAllocator<DefaultAllocator> {
GlobalAllocator {
inner: DefaultAllocator::create(),
}
}

impl GlobalAllocator {
pub fn name() -> String {
DefaultAllocator::name()
}
Expand All @@ -36,20 +64,20 @@ impl GlobalAllocator {
}
}

unsafe impl Allocator for GlobalAllocator {
unsafe impl<T: Allocator> Allocator for GlobalAllocator<T> {
#[inline(always)]
fn allocate(&self, layout: Layout) -> Result<NonNull<[u8]>, AllocError> {
DefaultAllocator::default().allocate(layout)
self.inner.allocate(layout)
}

#[inline(always)]
fn allocate_zeroed(&self, layout: Layout) -> Result<NonNull<[u8]>, AllocError> {
DefaultAllocator::default().allocate_zeroed(layout)
self.inner.allocate_zeroed(layout)
}

#[inline(always)]
unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: Layout) {
DefaultAllocator::default().deallocate(ptr, layout)
self.inner.deallocate(ptr, layout)
}

#[inline(always)]
Expand All @@ -59,7 +87,7 @@ unsafe impl Allocator for GlobalAllocator {
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
DefaultAllocator::default().grow(ptr, old_layout, new_layout)
self.inner.grow(ptr, old_layout, new_layout)
}

#[inline(always)]
Expand All @@ -69,7 +97,7 @@ unsafe impl Allocator for GlobalAllocator {
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
DefaultAllocator::default().grow_zeroed(ptr, old_layout, new_layout)
self.inner.grow_zeroed(ptr, old_layout, new_layout)
}

#[inline(always)]
Expand All @@ -79,32 +107,30 @@ unsafe impl Allocator for GlobalAllocator {
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
DefaultAllocator::default().shrink(ptr, old_layout, new_layout)
self.inner.shrink(ptr, old_layout, new_layout)
}
}

unsafe impl GlobalAlloc for GlobalAllocator {
unsafe impl<T: Allocator> GlobalAlloc for GlobalAllocator<T> {
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
if let Ok(ptr) = GlobalAllocator.allocate(layout) {
ptr.as_ptr() as *mut u8
} else {
null_mut()
match self.allocate(layout) {
Ok(ptr) => ptr.as_ptr() as *mut u8,
Err(_) => null_mut(),
}
}

#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
let ptr = NonNull::new(ptr).unwrap_unchecked();
GlobalAllocator.deallocate(ptr, layout);
self.deallocate(ptr, layout);
}

#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
if let Ok(ptr) = GlobalAllocator.allocate_zeroed(layout) {
ptr.as_ptr() as *mut u8
} else {
null_mut()
match self.allocate_zeroed(layout) {
Ok(ptr) => ptr.as_ptr() as *mut u8,
Err(_) => null_mut(),
}
}

Expand All @@ -115,21 +141,15 @@ unsafe impl GlobalAlloc for GlobalAllocator {
let ptr = NonNull::new(ptr).unwrap_unchecked();
let new_layout = Layout::from_size_align(new_size, layout.align()).unwrap();
match layout.size().cmp(&new_size) {
Less => {
if let Ok(ptr) = GlobalAllocator.grow(ptr, layout, new_layout) {
ptr.as_ptr() as *mut u8
} else {
null_mut()
}
}
Greater => {
if let Ok(ptr) = GlobalAllocator.shrink(ptr, layout, new_layout) {
ptr.as_ptr() as *mut u8
} else {
null_mut()
}
}
Equal => ptr.as_ptr(),
Less => match self.grow(ptr, layout, new_layout) {
Ok(ptr) => ptr.as_ptr() as *mut u8,
Err(_) => null_mut(),
},
Greater => match self.shrink(ptr, layout, new_layout) {
Ok(ptr) => ptr.as_ptr() as *mut u8,
Err(_) => null_mut(),
},
}
}
}
20 changes: 4 additions & 16 deletions src/common/base/src/mem_allocator/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
pub struct JEAllocator;

impl JEAllocator {
pub const fn create() -> JEAllocator {
JEAllocator
}

pub fn name() -> String {
"jemalloc".to_string()
}
Expand All @@ -44,7 +48,6 @@ pub mod linux {
use tikv_jemalloc_sys as ffi;

use super::JEAllocator;
use crate::runtime::ThreadTracker;

#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))]
const ALIGNOF_MAX_ALIGN_T: usize = 8;
Expand Down Expand Up @@ -77,8 +80,6 @@ pub mod linux {
unsafe impl Allocator for JEAllocator {
#[inline(always)]
fn allocate(&self, layout: Layout) -> Result<NonNull<[u8]>, AllocError> {
ThreadTracker::alloc(layout.size() as i64)?;

let data_address = if layout.size() == 0 {
unsafe { NonNull::new(layout.align() as *mut ()).unwrap_unchecked() }
} else {
Expand All @@ -92,8 +93,6 @@ pub mod linux {

#[inline(always)]
fn allocate_zeroed(&self, layout: Layout) -> Result<NonNull<[u8]>, AllocError> {
ThreadTracker::alloc(layout.size() as i64)?;

let data_address = if layout.size() == 0 {
unsafe { NonNull::new(layout.align() as *mut ()).unwrap_unchecked() }
} else {
Expand All @@ -108,8 +107,6 @@ pub mod linux {

#[inline(always)]
unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: Layout) {
ThreadTracker::dealloc(layout.size() as i64);

if layout.size() == 0 {
debug_assert_eq!(ptr.as_ptr() as usize, layout.align());
} else {
Expand All @@ -127,9 +124,6 @@ pub mod linux {
debug_assert_eq!(old_layout.align(), new_layout.align());
debug_assert!(old_layout.size() <= new_layout.size());

ThreadTracker::dealloc(old_layout.size() as i64);
ThreadTracker::alloc(new_layout.size() as i64)?;

let data_address = if new_layout.size() == 0 {
NonNull::new(new_layout.align() as *mut ()).unwrap_unchecked()
} else if old_layout.size() == 0 {
Expand All @@ -156,9 +150,6 @@ pub mod linux {
debug_assert_eq!(old_layout.align(), new_layout.align());
debug_assert!(old_layout.size() <= new_layout.size());

ThreadTracker::dealloc(old_layout.size() as i64);
ThreadTracker::alloc(new_layout.size() as i64)?;

let data_address = if new_layout.size() == 0 {
NonNull::new(new_layout.align() as *mut ()).unwrap_unchecked()
} else if old_layout.size() == 0 {
Expand Down Expand Up @@ -195,9 +186,6 @@ pub mod linux {
debug_assert_eq!(old_layout.align(), new_layout.align());
debug_assert!(old_layout.size() >= new_layout.size());

ThreadTracker::dealloc(old_layout.size() as i64);
ThreadTracker::alloc(new_layout.size() as i64)?;

if old_layout.size() == 0 {
debug_assert_eq!(ptr.as_ptr() as usize, old_layout.align());
let slice = std::slice::from_raw_parts_mut(ptr.as_ptr(), 0);
Expand Down
Loading
Loading