From 0e93a6c3bdb99d374c9afefa119705e836ee275d Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Fri, 7 Feb 2025 18:24:16 +0100 Subject: [PATCH] compute/correction-v2: introduce a Stage to speed up small inserts This commit extends the `CorrectionV2` data structure with a `Stage` that accumulates inserted updates before they get inserted into the sorted chains. This significantly reduces the amount of chain merges that need to be done in workloads that trickle in large amounts of updates in small batches, greatly speeding up these workloads. --- src/compute/src/sink/correction_v2.rs | 163 +++++++++++++++++++----- src/timely-util/src/containers/stack.rs | 8 ++ 2 files changed, 141 insertions(+), 30 deletions(-) diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs index 86e495445862c..d741de4431843 100644 --- a/src/compute/src/sink/correction_v2.rs +++ b/src/compute/src/sink/correction_v2.rs @@ -63,6 +63,13 @@ //! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain //! list until the chain invariant is restored. //! +//! Inserting an update into the correction buffer can be expensive: It involves allocating a new +//! chunk, copying the update in, and then likely merging with an existing chain to restore the +//! chain invariant. If updates trickle in in small batches, this can cause a considerable +//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted +//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a +//! [`Chunk`], they are sorted an inserted into the chains. +//! //! The insert operation has an amortized complexity of O(log N), with N being the current number //! of updates stored. //! @@ -148,6 +155,8 @@ impl Data for D {} pub(super) struct CorrectionV2 { /// Chains containing sorted updates. chains: Vec>, + /// A staging area for updates, to speed up small inserts. + stage: Stage, /// The frontier by which all contained times are advanced. since: Antichain, @@ -166,6 +175,7 @@ impl CorrectionV2 { pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { Self { chains: Default::default(), + stage: Default::default(), since: Antichain::from_elem(Timestamp::MIN), total_size: Default::default(), metrics, @@ -210,38 +220,23 @@ impl CorrectionV2 { fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t))); - consolidate(updates); + if let Some(chain) = self.stage.insert(updates) { + self.chains.push(chain); - let first_update = match updates.first() { - Some((d, t, r)) => (d, *t, *r), - None => return, - }; + // Restore the chain invariant. + let merge_needed = |chains: &[Chain<_>]| match chains { + [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(), + _ => false, + }; - // Optimization: If all items in `updates` sort after all items in the last chain, we can - // append them to the last chain directly instead of constructing a new chain. - let chain = match self.chains.last_mut() { - Some(chain) if chain.can_accept(first_update) => chain, - _ => { - self.chains.push(Chain::default()); - self.chains.last_mut().unwrap() + while merge_needed(&self.chains) { + let a = self.chains.pop().unwrap(); + let b = self.chains.pop().unwrap(); + let merged = merge_chains([a, b], &self.since); + self.chains.push(merged); } }; - chain.extend(updates.drain(..)); - - // Restore the chain invariant. - let merge_needed = |chains: &[Chain<_>]| match chains { - [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(), - _ => false, - }; - - while merge_needed(&self.chains) { - let a = self.chains.pop().unwrap(); - let b = self.chains.pop().unwrap(); - let merged = merge_chains([a, b], &self.since); - self.chains.push(merged); - } - self.update_metrics(); } @@ -277,11 +272,15 @@ impl CorrectionV2 { /// Once this method returns, all remaining updates before `upper` are contained in a single /// chain. Note that this chain might also contain updates beyond `upper` though! fn consolidate_before(&mut self, upper: &Antichain) { - if self.chains.is_empty() { + if self.chains.is_empty() && self.stage.is_empty() { return; } - let chains = std::mem::take(&mut self.chains); + let mut chains = std::mem::take(&mut self.chains); + if let Some(chain) = self.stage.flush() { + chains.push(chain); + } + let (merged, remains) = merge_chains_up_to(chains, &self.since, upper); self.chains = remains; @@ -327,6 +326,7 @@ impl CorrectionV2 { /// Panics if the given `since` is less than the current since frontier. pub fn advance_since(&mut self, since: Antichain) { assert!(PartialOrder::less_equal(&self.since, &since)); + self.stage.advance_times(&since); self.since = since; } @@ -341,7 +341,7 @@ impl CorrectionV2 { /// Update persist sink metrics. fn update_metrics(&mut self) { - let mut new_size = LengthAndCapacity::default(); + let mut new_size = self.stage.get_size(); for chain in &mut self.chains { new_size += chain.get_size(); } @@ -825,6 +825,11 @@ impl Chunk { Container::len(&self.data) } + /// Return the (local) capacity of the chunk. + fn capacity(&self) -> usize { + self.data.capacity() + } + /// Return whether the chunk is at capacity. fn at_capacity(&self) -> bool { self.data.at_capacity() @@ -899,6 +904,104 @@ impl Chunk { } } +/// A buffer for staging updates before they are inserted into the sorted chains. +#[derive(Debug)] +struct Stage { + /// The contained updates. + /// + /// This vector has a fixed capacity equal to the [`Chunk`] capacity. + data: Vec<(D, Timestamp, Diff)>, +} + +impl Default for Stage { + fn default() -> Self { + // Make sure that the `Stage` has the same capacity as a `Chunk`. + let chunk = Chunk::::default(); + let data = Vec::with_capacity(chunk.capacity()); + Self { data } + } +} + +impl Stage { + /// Return whether the stage is empty. + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + /// Insert a batch of updates, possibly producing a ready [`Chain`]. + fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option> { + if updates.is_empty() { + return None; + } + + // Determine how many chunks we can fill with the available updates. + let update_count = self.data.len() + updates.len(); + let chunk_size = self.data.capacity(); + let chunk_count = update_count / chunk_size; + + let mut new_updates = updates.drain(..); + + // If we have enough shipable updates, collect them, consolidate, and build a chain. + let maybe_chain = if chunk_count > 0 { + let ship_count = chunk_count * chunk_size; + let mut buffer = Vec::with_capacity(ship_count); + + buffer.append(&mut self.data); + while buffer.len() < ship_count { + let update = new_updates.next().unwrap(); + buffer.push(update); + } + + consolidate(&mut buffer); + + let mut chain = Chain::default(); + chain.extend(buffer); + Some(chain) + } else { + None + }; + + // Stage the remaining updates. + self.data.extend(new_updates); + + maybe_chain + } + + /// Flush all currently staged updates into a chain. + fn flush(&mut self) -> Option> { + consolidate(&mut self.data); + + if self.data.is_empty() { + return None; + } + + let mut chain = Chain::default(); + chain.extend(self.data.drain(..)); + Some(chain) + } + + /// Advance the times of staged updates by the given `since`. + fn advance_times(&mut self, since: &Antichain) { + let Some(since_ts) = since.as_option() else { + // If the since is the empty frontier, discard all updates. + self.data.clear(); + return; + }; + + for (_, time, _) in &mut self.data { + *time = std::cmp::max(*time, *since_ts); + } + } + + /// Return the size of the stage, for use in metrics. + fn get_size(&mut self) -> LengthAndCapacity { + LengthAndCapacity { + length: self.data.len(), + capacity: self.data.capacity(), + } + } +} + /// Sort and consolidate the given list of updates. /// /// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`], diff --git a/src/timely-util/src/containers/stack.rs b/src/timely-util/src/containers/stack.rs index 10fc428617bf5..45ef191e23502 100644 --- a/src/timely-util/src/containers/stack.rs +++ b/src/timely-util/src/containers/stack.rs @@ -49,6 +49,14 @@ impl StackWrapper { } } + /// Return the capacity of the local buffer. + pub fn capacity(&self) -> usize { + match self { + Self::Legacy(stack) => stack.capacity(), + Self::Chunked(stack) => stack.capacity(), + } + } + /// Estimate the memory capacity in bytes. #[inline] pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {