Skip to content

Commit 9705303

Browse files
committed
Return implication changes from propagate_all
This commit makes the reachability tracker directly return changes to implications from `propagate_all`, rather than stashing them and expecting callers to access them through a separate `pushed` method. This API changes allows `propagate_all` to assume that the caller will consume the changes before the next call to `propagate_all`, which in turn guarantees that `pushed_changes` only contains changes from the last `propagate_all` call, rather than possibly changes from previous calls as well.
1 parent 64be92b commit 9705303

File tree

2 files changed

+12
-19
lines changed

2 files changed

+12
-19
lines changed

timely/src/progress/reachability.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,9 @@
3333
//! tracker.update_source(Source::new(0, 0), 17, 1);
3434
//!
3535
//! // Propagate changes; until this call updates are simply buffered.
36-
//! tracker.propagate_all();
36+
//! let updates = tracker.propagate_all();
3737
//!
38-
//! let mut results =
39-
//! tracker
40-
//! .pushed()
41-
//! .drain()
38+
//! let mut results = updates
4239
//! .filter(|((location, time), delta)| location.is_target())
4340
//! .collect::<Vec<_>>();
4441
//!
@@ -55,12 +52,9 @@
5552
//! tracker.update_source(Source::new(0, 0), 17, -1);
5653
//!
5754
//! // Propagate changes; until this call updates are simply buffered.
58-
//! tracker.propagate_all();
55+
//! let updates = tracker.propagate_all();
5956
//!
60-
//! let mut results =
61-
//! tracker
62-
//! .pushed()
63-
//! .drain()
57+
//! let mut results = updates
6458
//! .filter(|((location, time), delta)| location.is_target())
6559
//! .collect::<Vec<_>>();
6660
//!
@@ -564,8 +558,10 @@ impl<T:Timestamp> Tracker<T> {
564558
/// Propagates all pending updates.
565559
///
566560
/// The method drains `self.input_changes` and circulates their implications
567-
/// until we cease deriving new implications.
568-
pub fn propagate_all(&mut self) {
561+
/// until we cease deriving new implications. It returns an iterator over updates
562+
/// to implications.
563+
pub fn propagate_all(&mut self) -> impl Iterator<Item = ((Location, T), i64)> + '_ {
564+
self.pushed_changes.clear();
569565

570566
// Step 0: If logging is enabled, construct and log inbound changes.
571567
if let Some(logger) = &mut self.logger {
@@ -699,18 +695,15 @@ impl<T:Timestamp> Tracker<T> {
699695
};
700696
}
701697
}
698+
699+
self.pushed_changes.drain()
702700
}
703701

704702
/// Implications of maintained capabilities projected to each output.
705703
pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
706704
&mut self.output_changes[..]
707705
}
708706

709-
/// A mutable reference to the pushed results of changes.
710-
pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
711-
&mut self.pushed_changes
712-
}
713-
714707
/// Reveals per-operator frontier state.
715708
pub fn node_state(&self, index: usize) -> &PerOperator<T> {
716709
&self.per_operator[index]

timely/src/progress/subgraph.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,10 @@ where
459459
}
460460

461461
// Propagate implications of progress changes.
462-
self.pointstamp_tracker.propagate_all();
462+
let updates = self.pointstamp_tracker.propagate_all();
463463

464464
// Drain propagated information into shared progress structure.
465-
for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() {
465+
for ((location, time), diff) in updates {
466466
self.maybe_shutdown.push(location.node);
467467
// Targets are actionable, sources are not.
468468
if let crate::progress::Port::Target(port) = location.port {

0 commit comments

Comments
 (0)