diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs index 86e495445862c..d741de4431843 100644 --- a/src/compute/src/sink/correction_v2.rs +++ b/src/compute/src/sink/correction_v2.rs @@ -63,6 +63,13 @@ //! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain //! list until the chain invariant is restored. //! +//! Inserting an update into the correction buffer can be expensive: It involves allocating a new +//! chunk, copying the update in, and then likely merging with an existing chain to restore the +//! chain invariant. If updates trickle in in small batches, this can cause a considerable +//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted +//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a +//! [`Chunk`], they are sorted an inserted into the chains. +//! //! The insert operation has an amortized complexity of O(log N), with N being the current number //! of updates stored. //! @@ -148,6 +155,8 @@ impl Data for D {} pub(super) struct CorrectionV2 { /// Chains containing sorted updates. chains: Vec>, + /// A staging area for updates, to speed up small inserts. + stage: Stage, /// The frontier by which all contained times are advanced. since: Antichain, @@ -166,6 +175,7 @@ impl CorrectionV2 { pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { Self { chains: Default::default(), + stage: Default::default(), since: Antichain::from_elem(Timestamp::MIN), total_size: Default::default(), metrics, @@ -210,38 +220,23 @@ impl CorrectionV2 { fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t))); - consolidate(updates); + if let Some(chain) = self.stage.insert(updates) { + self.chains.push(chain); - let first_update = match updates.first() { - Some((d, t, r)) => (d, *t, *r), - None => return, - }; + // Restore the chain invariant. + let merge_needed = |chains: &[Chain<_>]| match chains { + [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(), + _ => false, + }; - // Optimization: If all items in `updates` sort after all items in the last chain, we can - // append them to the last chain directly instead of constructing a new chain. - let chain = match self.chains.last_mut() { - Some(chain) if chain.can_accept(first_update) => chain, - _ => { - self.chains.push(Chain::default()); - self.chains.last_mut().unwrap() + while merge_needed(&self.chains) { + let a = self.chains.pop().unwrap(); + let b = self.chains.pop().unwrap(); + let merged = merge_chains([a, b], &self.since); + self.chains.push(merged); } }; - chain.extend(updates.drain(..)); - - // Restore the chain invariant. - let merge_needed = |chains: &[Chain<_>]| match chains { - [.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(), - _ => false, - }; - - while merge_needed(&self.chains) { - let a = self.chains.pop().unwrap(); - let b = self.chains.pop().unwrap(); - let merged = merge_chains([a, b], &self.since); - self.chains.push(merged); - } - self.update_metrics(); } @@ -277,11 +272,15 @@ impl CorrectionV2 { /// Once this method returns, all remaining updates before `upper` are contained in a single /// chain. Note that this chain might also contain updates beyond `upper` though! fn consolidate_before(&mut self, upper: &Antichain) { - if self.chains.is_empty() { + if self.chains.is_empty() && self.stage.is_empty() { return; } - let chains = std::mem::take(&mut self.chains); + let mut chains = std::mem::take(&mut self.chains); + if let Some(chain) = self.stage.flush() { + chains.push(chain); + } + let (merged, remains) = merge_chains_up_to(chains, &self.since, upper); self.chains = remains; @@ -327,6 +326,7 @@ impl CorrectionV2 { /// Panics if the given `since` is less than the current since frontier. pub fn advance_since(&mut self, since: Antichain) { assert!(PartialOrder::less_equal(&self.since, &since)); + self.stage.advance_times(&since); self.since = since; } @@ -341,7 +341,7 @@ impl CorrectionV2 { /// Update persist sink metrics. fn update_metrics(&mut self) { - let mut new_size = LengthAndCapacity::default(); + let mut new_size = self.stage.get_size(); for chain in &mut self.chains { new_size += chain.get_size(); } @@ -825,6 +825,11 @@ impl Chunk { Container::len(&self.data) } + /// Return the (local) capacity of the chunk. + fn capacity(&self) -> usize { + self.data.capacity() + } + /// Return whether the chunk is at capacity. fn at_capacity(&self) -> bool { self.data.at_capacity() @@ -899,6 +904,104 @@ impl Chunk { } } +/// A buffer for staging updates before they are inserted into the sorted chains. +#[derive(Debug)] +struct Stage { + /// The contained updates. + /// + /// This vector has a fixed capacity equal to the [`Chunk`] capacity. + data: Vec<(D, Timestamp, Diff)>, +} + +impl Default for Stage { + fn default() -> Self { + // Make sure that the `Stage` has the same capacity as a `Chunk`. + let chunk = Chunk::::default(); + let data = Vec::with_capacity(chunk.capacity()); + Self { data } + } +} + +impl Stage { + /// Return whether the stage is empty. + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + /// Insert a batch of updates, possibly producing a ready [`Chain`]. + fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option> { + if updates.is_empty() { + return None; + } + + // Determine how many chunks we can fill with the available updates. + let update_count = self.data.len() + updates.len(); + let chunk_size = self.data.capacity(); + let chunk_count = update_count / chunk_size; + + let mut new_updates = updates.drain(..); + + // If we have enough shipable updates, collect them, consolidate, and build a chain. + let maybe_chain = if chunk_count > 0 { + let ship_count = chunk_count * chunk_size; + let mut buffer = Vec::with_capacity(ship_count); + + buffer.append(&mut self.data); + while buffer.len() < ship_count { + let update = new_updates.next().unwrap(); + buffer.push(update); + } + + consolidate(&mut buffer); + + let mut chain = Chain::default(); + chain.extend(buffer); + Some(chain) + } else { + None + }; + + // Stage the remaining updates. + self.data.extend(new_updates); + + maybe_chain + } + + /// Flush all currently staged updates into a chain. + fn flush(&mut self) -> Option> { + consolidate(&mut self.data); + + if self.data.is_empty() { + return None; + } + + let mut chain = Chain::default(); + chain.extend(self.data.drain(..)); + Some(chain) + } + + /// Advance the times of staged updates by the given `since`. + fn advance_times(&mut self, since: &Antichain) { + let Some(since_ts) = since.as_option() else { + // If the since is the empty frontier, discard all updates. + self.data.clear(); + return; + }; + + for (_, time, _) in &mut self.data { + *time = std::cmp::max(*time, *since_ts); + } + } + + /// Return the size of the stage, for use in metrics. + fn get_size(&mut self) -> LengthAndCapacity { + LengthAndCapacity { + length: self.data.len(), + capacity: self.data.capacity(), + } + } +} + /// Sort and consolidate the given list of updates. /// /// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`], diff --git a/src/timely-util/src/containers/stack.rs b/src/timely-util/src/containers/stack.rs index 10fc428617bf5..45ef191e23502 100644 --- a/src/timely-util/src/containers/stack.rs +++ b/src/timely-util/src/containers/stack.rs @@ -49,6 +49,14 @@ impl StackWrapper { } } + /// Return the capacity of the local buffer. + pub fn capacity(&self) -> usize { + match self { + Self::Legacy(stack) => stack.capacity(), + Self::Chunked(stack) => stack.capacity(), + } + } + /// Estimate the memory capacity in bytes. #[inline] pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {