Skip to content

Commit

Permalink
compute/correction-v2: introduce a Stage to speed up small inserts
Browse files Browse the repository at this point in the history
This commit extends the `CorrectionV2` data structure with a `Stage`
that accumulates inserted updates before they get inserted into the
sorted chains. This significantly reduces the amount of chain merges
that need to be done in workloads that trickle in large amounts of
updates in small batches, greatly speeding up these workloads.
  • Loading branch information
teskje committed Feb 7, 2025
1 parent a04bf4d commit 0e93a6c
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 30 deletions.
163 changes: 133 additions & 30 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@
//! A batch of updates is appended as a new chain. Then chains are merged at the end of the chain
//! list until the chain invariant is restored.
//!
//! Inserting an update into the correction buffer can be expensive: It involves allocating a new
//! chunk, copying the update in, and then likely merging with an existing chain to restore the
//! chain invariant. If updates trickle in in small batches, this can cause a considerable
//! overhead. The amortize this overhead, new updates aren't immediately inserted into the sorted
//! chains but instead stored in a [`Stage`] buffer. Once enough updates have been staged to fill a
//! [`Chunk`], they are sorted an inserted into the chains.
//!
//! The insert operation has an amortized complexity of O(log N), with N being the current number
//! of updates stored.
//!
Expand Down Expand Up @@ -148,6 +155,8 @@ impl<D: differential_dataflow::Data + Columnation> Data for D {}
pub(super) struct CorrectionV2<D: Data> {
/// Chains containing sorted updates.
chains: Vec<Chain<D>>,
/// A staging area for updates, to speed up small inserts.
stage: Stage<D>,
/// The frontier by which all contained times are advanced.
since: Antichain<Timestamp>,

Expand All @@ -166,6 +175,7 @@ impl<D: Data> CorrectionV2<D> {
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
Self {
chains: Default::default(),
stage: Default::default(),
since: Antichain::from_elem(Timestamp::MIN),
total_size: Default::default(),
metrics,
Expand Down Expand Up @@ -210,38 +220,23 @@ impl<D: Data> CorrectionV2<D> {
fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));

consolidate(updates);
if let Some(chain) = self.stage.insert(updates) {
self.chains.push(chain);

let first_update = match updates.first() {
Some((d, t, r)) => (d, *t, *r),
None => return,
};
// Restore the chain invariant.
let merge_needed = |chains: &[Chain<_>]| match chains {
[.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(),
_ => false,
};

// Optimization: If all items in `updates` sort after all items in the last chain, we can
// append them to the last chain directly instead of constructing a new chain.
let chain = match self.chains.last_mut() {
Some(chain) if chain.can_accept(first_update) => chain,
_ => {
self.chains.push(Chain::default());
self.chains.last_mut().unwrap()
while merge_needed(&self.chains) {
let a = self.chains.pop().unwrap();
let b = self.chains.pop().unwrap();
let merged = merge_chains([a, b], &self.since);
self.chains.push(merged);
}
};

chain.extend(updates.drain(..));

// Restore the chain invariant.
let merge_needed = |chains: &[Chain<_>]| match chains {
[.., prev, last] => last.len() * CHAIN_PROPORTIONALITY > prev.len(),
_ => false,
};

while merge_needed(&self.chains) {
let a = self.chains.pop().unwrap();
let b = self.chains.pop().unwrap();
let merged = merge_chains([a, b], &self.since);
self.chains.push(merged);
}

self.update_metrics();
}

Expand Down Expand Up @@ -277,11 +272,15 @@ impl<D: Data> CorrectionV2<D> {
/// Once this method returns, all remaining updates before `upper` are contained in a single
/// chain. Note that this chain might also contain updates beyond `upper` though!
fn consolidate_before(&mut self, upper: &Antichain<Timestamp>) {
if self.chains.is_empty() {
if self.chains.is_empty() && self.stage.is_empty() {
return;
}

let chains = std::mem::take(&mut self.chains);
let mut chains = std::mem::take(&mut self.chains);
if let Some(chain) = self.stage.flush() {
chains.push(chain);
}

let (merged, remains) = merge_chains_up_to(chains, &self.since, upper);

self.chains = remains;
Expand Down Expand Up @@ -327,6 +326,7 @@ impl<D: Data> CorrectionV2<D> {
/// Panics if the given `since` is less than the current since frontier.
pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
assert!(PartialOrder::less_equal(&self.since, &since));
self.stage.advance_times(&since);
self.since = since;
}

Expand All @@ -341,7 +341,7 @@ impl<D: Data> CorrectionV2<D> {

/// Update persist sink metrics.
fn update_metrics(&mut self) {
let mut new_size = LengthAndCapacity::default();
let mut new_size = self.stage.get_size();
for chain in &mut self.chains {
new_size += chain.get_size();
}
Expand Down Expand Up @@ -825,6 +825,11 @@ impl<D: Data> Chunk<D> {
Container::len(&self.data)
}

/// Return the (local) capacity of the chunk.
fn capacity(&self) -> usize {
self.data.capacity()
}

/// Return whether the chunk is at capacity.
fn at_capacity(&self) -> bool {
self.data.at_capacity()
Expand Down Expand Up @@ -899,6 +904,104 @@ impl<D: Data> Chunk<D> {
}
}

/// A buffer for staging updates before they are inserted into the sorted chains.
#[derive(Debug)]
struct Stage<D> {
/// The contained updates.
///
/// This vector has a fixed capacity equal to the [`Chunk`] capacity.
data: Vec<(D, Timestamp, Diff)>,
}

impl<D: Data> Default for Stage<D> {
fn default() -> Self {
// Make sure that the `Stage` has the same capacity as a `Chunk`.
let chunk = Chunk::<D>::default();
let data = Vec::with_capacity(chunk.capacity());
Self { data }
}
}

impl<D: Data> Stage<D> {
/// Return whether the stage is empty.
fn is_empty(&self) -> bool {
self.data.is_empty()
}

/// Insert a batch of updates, possibly producing a ready [`Chain`].
fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) -> Option<Chain<D>> {
if updates.is_empty() {
return None;
}

// Determine how many chunks we can fill with the available updates.
let update_count = self.data.len() + updates.len();
let chunk_size = self.data.capacity();
let chunk_count = update_count / chunk_size;

let mut new_updates = updates.drain(..);

// If we have enough shipable updates, collect them, consolidate, and build a chain.
let maybe_chain = if chunk_count > 0 {
let ship_count = chunk_count * chunk_size;
let mut buffer = Vec::with_capacity(ship_count);

buffer.append(&mut self.data);
while buffer.len() < ship_count {
let update = new_updates.next().unwrap();
buffer.push(update);
}

consolidate(&mut buffer);

let mut chain = Chain::default();
chain.extend(buffer);
Some(chain)
} else {
None
};

// Stage the remaining updates.
self.data.extend(new_updates);

maybe_chain
}

/// Flush all currently staged updates into a chain.
fn flush(&mut self) -> Option<Chain<D>> {
consolidate(&mut self.data);

if self.data.is_empty() {
return None;
}

let mut chain = Chain::default();
chain.extend(self.data.drain(..));
Some(chain)
}

/// Advance the times of staged updates by the given `since`.
fn advance_times(&mut self, since: &Antichain<Timestamp>) {
let Some(since_ts) = since.as_option() else {
// If the since is the empty frontier, discard all updates.
self.data.clear();
return;
};

for (_, time, _) in &mut self.data {
*time = std::cmp::max(*time, *since_ts);
}
}

/// Return the size of the stage, for use in metrics.
fn get_size(&mut self) -> LengthAndCapacity {
LengthAndCapacity {
length: self.data.len(),
capacity: self.data.capacity(),
}
}
}

/// Sort and consolidate the given list of updates.
///
/// This function is the same as [`differential_dataflow::consolidation::consolidate_updates`],
Expand Down
8 changes: 8 additions & 0 deletions src/timely-util/src/containers/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ impl<T: Columnation> StackWrapper<T> {
}
}

/// Return the capacity of the local buffer.
pub fn capacity(&self) -> usize {
match self {
Self::Legacy(stack) => stack.capacity(),
Self::Chunked(stack) => stack.capacity(),
}
}

/// Estimate the memory capacity in bytes.
#[inline]
pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
Expand Down

0 comments on commit 0e93a6c

Please sign in to comment.