Skip to content

Commit fc78dbc

Browse files
authored
Support lgalloc for columnar (#31230)
Adds support for allocating memory for columnar's aligned type from lgalloc. Adds a flag `enable_lgalloc_columnar` to control the behavior. ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 734bad5 commit fc78dbc

File tree

7 files changed

+128
-21
lines changed

7 files changed

+128
-21
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/compute-types/src/dyncfgs.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ pub const ENABLE_COLUMNATION_LGALLOC: Config<bool> = Config::new(
5757
"Enable allocating regions from lgalloc.",
5858
);
5959

60+
/// Enable lgalloc for columnar.
61+
pub const ENABLE_COLUMNAR_LGALLOC: Config<bool> = Config::new(
62+
"enable_columnar_lgalloc",
63+
true,
64+
"Enable allocating aligned regions in columnar from lgalloc.",
65+
);
66+
6067
/// Enable lgalloc's eager memory return/reclamation feature.
6168
pub const ENABLE_LGALLOC_EAGER_RECLAMATION: Config<bool> = Config::new(
6269
"enable_lgalloc_eager_reclamation",
@@ -188,6 +195,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
188195
.add(&ENABLE_LGALLOC)
189196
.add(&ENABLE_COLUMNATION_LGALLOC)
190197
.add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
198+
.add(&ENABLE_COLUMNAR_LGALLOC)
191199
.add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL)
192200
.add(&DATAFLOW_MAX_INFLIGHT_BYTES)
193201
.add(&DATAFLOW_MAX_INFLIGHT_BYTES_CC)

src/compute/src/compute_state.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,9 @@ impl ComputeState {
303303
std::sync::atomic::Ordering::Relaxed,
304304
);
305305

306+
let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
307+
mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
308+
306309
// Remember the maintenance interval locally to avoid reading it from the config set on
307310
// every server iteration.
308311
self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);

src/ore/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ rust_library(
2828
"assert",
2929
"async",
3030
"async-trait",
31+
"bytemuck",
3132
"bytes",
3233
"bytes_",
3334
"capture",
@@ -110,6 +111,7 @@ rust_test(
110111
"assert",
111112
"async",
112113
"async-trait",
114+
"bytemuck",
113115
"bytes",
114116
"bytes_",
115117
"capture",

src/ore/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ anyhow = { version = "1.0.95", optional = true }
1919
# The only exception to the above is the `either` crate, which itself has zero
2020
# dependencies and is widely considered to be basically part of the stdlib.
2121
async-trait = { version = "0.1.83", optional = true }
22+
bytemuck = { version = "1.21.0", optional = true }
2223
bytes = { version = "1.3.0", optional = true }
2324
chrono = { version = "0.4.39", default-features = false, features = [
2425
"std",
@@ -119,7 +120,7 @@ async = [
119120
bytes_ = ["bytes", "compact_bytes", "smallvec", "smallvec/const_generics", "region", "tracing_"]
120121
network = ["async", "bytes", "smallvec", "tonic", "tracing"]
121122
process = ["libc"]
122-
region = ["lgalloc"]
123+
region = ["dep:lgalloc", "dep:bytemuck"]
123124
tracing_ = [
124125
"anyhow",
125126
"tracing",

src/ore/src/region.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ impl<T> LgAllocRegion<T> {
170170
/// NOTE: We plan to deprecate this type soon. Users should switch to different types or the raw
171171
/// `lgalloc` API instead.
172172
#[derive(Debug)]
173+
#[must_use]
173174
pub enum Region<T> {
174175
/// A possibly empty heap-allocated region, represented as a vector.
175176
Heap(Vec<T>),
@@ -218,14 +219,12 @@ impl<T> Default for Region<T> {
218219
impl<T> Region<T> {
219220
/// Create a new empty region.
220221
#[inline]
221-
#[must_use]
222222
pub fn new_empty() -> Region<T> {
223223
Region::Heap(Vec::new())
224224
}
225225

226226
/// Create a new heap-allocated region of a specific capacity.
227227
#[inline]
228-
#[must_use]
229228
pub fn new_heap(capacity: usize) -> Region<T> {
230229
Region::Heap(Vec::with_capacity(capacity))
231230
}
@@ -255,7 +254,6 @@ impl<T> Region<T> {
255254
/// but can be larger if the implementation requires it.
256255
///
257256
/// Returns a [`Region::MMap`] if possible, and falls back to [`Region::Heap`] otherwise.
258-
#[must_use]
259257
pub fn new_auto(capacity: usize) -> Region<T> {
260258
if ENABLE_LGALLOC_REGION.load(std::sync::atomic::Ordering::Relaxed) {
261259
match Region::new_mmap(capacity) {
@@ -340,6 +338,8 @@ impl<T> Region<T> {
340338
/// Otherwise, the vector representation could try to reallocate the underlying memory
341339
/// using the global allocator, which would cause problems because the memory might not
342340
/// have originated from it. This is undefined behavior.
341+
///
342+
/// Private because it is too dangerous to expose to the public.
343343
#[inline]
344344
unsafe fn as_mut_vec(&mut self) -> &mut Vec<T> {
345345
match self {
@@ -349,6 +349,50 @@ impl<T> Region<T> {
349349
}
350350
}
351351

352+
impl<T: bytemuck::AnyBitPattern> Region<T> {
353+
/// Create a new file-based mapped region of a specific capacity, initialized to 0. The
354+
/// capacity of the returned region can be larger than requested to accommodate page sizes.
355+
///
356+
/// # Errors
357+
///
358+
/// Returns an error if the memory allocation fails.
359+
#[inline(always)]
360+
pub fn new_mmap_zeroed(capacity: usize) -> Result<Self, lgalloc::AllocError> {
361+
let (ptr, capacity, handle) = lgalloc::allocate::<T>(capacity)?;
362+
// SAFETY: `allocate` returns a valid memory block, and `T` supports a null-bit pattern.
363+
unsafe { ptr.as_ptr().write_bytes(0, capacity) }
364+
// SAFETY: `ptr` points to suitable memory.
365+
// It is UB to call `from_raw_parts` with a pointer not allocated from the global
366+
// allocator, but we accept this here because we promise never to reallocate the vector.
367+
let inner =
368+
ManuallyDrop::new(unsafe { Vec::from_raw_parts(ptr.as_ptr(), capacity, capacity) });
369+
let handle = Some(handle);
370+
Ok(Self::MMap(MMapRegion { inner, handle }))
371+
}
372+
373+
/// Allocate a zeroed region on the heap.
374+
#[inline(always)]
375+
pub fn new_heap_zeroed(capacity: usize) -> Self {
376+
Self::Heap(vec![T::zeroed(); capacity])
377+
}
378+
379+
/// Construct a new region with the specified capacity, initialized to 0.
380+
pub fn new_auto_zeroed(capacity: usize) -> Self {
381+
if ENABLE_LGALLOC_REGION.load(std::sync::atomic::Ordering::Relaxed) {
382+
match Region::new_mmap_zeroed(capacity) {
383+
Ok(r) => return r,
384+
Err(lgalloc::AllocError::Disabled)
385+
| Err(lgalloc::AllocError::InvalidSizeClass(_)) => {}
386+
Err(e) => {
387+
eprintln!("lgalloc error: {e}, falling back to heap");
388+
}
389+
}
390+
}
391+
// Fall-through
392+
Self::new_heap_zeroed(capacity)
393+
}
394+
}
395+
352396
impl<T: Clone> Region<T> {
353397
/// Extend the region from a slice.
354398
///

src/timely-util/src/containers.rs

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,49 @@ use timely::container::columnation::TimelyStack;
2424

2525
pub mod stack;
2626

27+
pub(crate) use alloc::alloc_aligned_zeroed;
28+
pub use alloc::{enable_columnar_lgalloc, set_enable_columnar_lgalloc};
29+
pub use builder::ColumnBuilder;
2730
pub use container::Column;
31+
pub use provided_builder::ProvidedBuilder;
32+
33+
mod alloc {
34+
use mz_ore::region::Region;
35+
36+
/// Allocate a region of memory with a capacity of at least `len` that is properly aligned
37+
/// and zeroed. The memory in Regions is always aligned to its content type.
38+
#[inline]
39+
pub(crate) fn alloc_aligned_zeroed<T: bytemuck::AnyBitPattern>(len: usize) -> Region<T> {
40+
if enable_columnar_lgalloc() {
41+
Region::new_auto_zeroed(len)
42+
} else {
43+
Region::new_heap_zeroed(len)
44+
}
45+
}
46+
47+
thread_local! {
48+
static ENABLE_COLUMNAR_LGALLOC: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
49+
}
50+
51+
/// Returns `true` if columnar allocations should come from lgalloc.
52+
#[inline]
53+
pub fn enable_columnar_lgalloc() -> bool {
54+
ENABLE_COLUMNAR_LGALLOC.get()
55+
}
56+
57+
/// Set whether columnar allocations should come from lgalloc. Applies to future allocations.
58+
pub fn set_enable_columnar_lgalloc(enabled: bool) {
59+
ENABLE_COLUMNAR_LGALLOC.set(enabled);
60+
}
61+
}
2862

2963
mod container {
3064
use columnar::bytes::{EncodeDecode, Sequence};
3165
use columnar::common::IterOwn;
3266
use columnar::Columnar;
3367
use columnar::Container as _;
3468
use columnar::{Clear, FromBytes, Index, Len};
69+
use mz_ore::region::Region;
3570
use timely::bytes::arc::Bytes;
3671
use timely::container::PushInto;
3772
use timely::dataflow::channels::ContainerBytes;
@@ -51,7 +86,7 @@ mod container {
5186
///
5287
/// Reasons could include misalignment, cloning of data, or wanting
5388
/// to release the `Bytes` as a scarce resource.
54-
Align(Box<[u64]>),
89+
Align(Region<u64>),
5590
}
5691

5792
impl<C: Columnar> Column<C> {
@@ -90,11 +125,15 @@ mod container {
90125
Column::Typed(t) => Column::Typed(t.clone()),
91126
Column::Bytes(b) => {
92127
assert_eq!(b.len() % 8, 0);
93-
let mut alloc: Vec<u64> = vec![0; b.len() / 8];
128+
let mut alloc: Region<u64> = super::alloc_aligned_zeroed(b.len() / 8);
94129
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]);
95-
Self::Align(alloc.into())
130+
Self::Align(alloc)
131+
}
132+
Column::Align(a) => {
133+
let mut alloc = super::alloc_aligned_zeroed(a.len());
134+
alloc.extend_from_slice(&a[..]);
135+
Column::Align(alloc)
96136
}
97-
Column::Align(a) => Column::Align(a.clone()),
98137
}
99138
}
100139
}
@@ -111,8 +150,7 @@ mod container {
111150
fn clear(&mut self) {
112151
match self {
113152
Column::Typed(t) => t.clear(),
114-
Column::Bytes(_) => *self = Column::Typed(Default::default()),
115-
Column::Align(_) => *self = Column::Typed(Default::default()),
153+
Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
116154
}
117155
}
118156

@@ -159,9 +197,9 @@ mod container {
159197
Self::Bytes(bytes)
160198
} else {
161199
// We failed to cast the slice, so we'll reallocate.
162-
let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
200+
let mut alloc: Region<u64> = super::alloc_aligned_zeroed(bytes.len() / 8);
163201
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
164-
Self::Align(alloc.into())
202+
Self::Align(alloc)
165203
}
166204
}
167205

@@ -183,7 +221,6 @@ mod container {
183221
}
184222
}
185223

186-
pub use builder::ColumnBuilder;
187224
mod builder {
188225
use std::collections::VecDeque;
189226

@@ -192,7 +229,7 @@ mod builder {
192229
use timely::container::PushInto;
193230
use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
194231

195-
use super::Column;
232+
use crate::containers::Column;
196233

197234
/// A container builder for `Column<C>`.
198235
pub struct ColumnBuilder<C: Columnar> {
@@ -219,11 +256,24 @@ mod builder {
219256
let words = Sequence::length_in_words(&self.current.borrow());
220257
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
221258
if round - words < round / 10 {
222-
let mut alloc = Vec::with_capacity(round);
223-
Sequence::encode(&mut alloc, &self.current.borrow());
224-
self.pending
225-
.push_back(Column::Align(alloc.into_boxed_slice()));
226-
self.current.clear();
259+
/// Move the contents from `current` to an aligned allocation, and push it to `pending`.
260+
/// The contents must fit in `round` words (u64).
261+
#[cold]
262+
fn outlined_align<C>(
263+
current: &mut C::Container,
264+
round: usize,
265+
pending: &mut VecDeque<Column<C>>,
266+
) where
267+
C: Columnar,
268+
{
269+
let mut alloc = super::alloc_aligned_zeroed(round);
270+
let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
271+
Sequence::write(writer, &current.borrow()).unwrap();
272+
pending.push_back(Column::Align(alloc));
273+
current.clear();
274+
}
275+
276+
outlined_align(&mut self.current, round, &mut self.pending);
227277
}
228278
}
229279
}
@@ -390,8 +440,6 @@ pub mod batcher {
390440
}
391441
}
392442

393-
pub use provided_builder::ProvidedBuilder;
394-
395443
mod provided_builder {
396444
use timely::container::ContainerBuilder;
397445
use timely::Container;

0 commit comments

Comments
 (0)