Skip to content

Commit b43381b

Browse files
Merge pull request #658 from frankmcsherry/linear_reachability_2
Linear reachability, without a shadowing bug
2 parents 94620a8 + 62fef07 commit b43381b

File tree

12 files changed

+279
-143
lines changed

12 files changed

+279
-143
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use std::cell::RefCell;
2727
use std::fmt::{self, Debug};
2828

2929
use crate::order::PartialOrder;
30-
use crate::progress::Antichain;
3130
use crate::progress::Timestamp;
3231
use crate::progress::ChangeBatch;
32+
use crate::progress::operate::PortConnectivity;
3333
use crate::scheduling::Activations;
3434
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3535

@@ -238,26 +238,28 @@ pub struct InputCapability<T: Timestamp> {
238238
/// Output capability buffers, for use in minting capabilities.
239239
internal: CapabilityUpdates<T>,
240240
/// Timestamp summaries for each output.
241-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
241+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242242
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
243243
consumed_guard: ConsumedGuard<T>,
244244
}
245245

246246
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247247
fn time(&self) -> &T { self.time() }
248248
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249-
let borrow = self.summaries.borrow();
250-
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
251-
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252-
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
253-
})
249+
let summaries_borrow = self.summaries.borrow();
250+
let internal_borrow = self.internal.borrow();
251+
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252+
let result = summaries_borrow.iter_ports().any(|(port, path)| {
253+
Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
254+
});
255+
result
254256
}
255257
}
256258

257259
impl<T: Timestamp> InputCapability<T> {
258260
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
259261
/// the provided [`ChangeBatch`].
260-
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
262+
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
261263
InputCapability {
262264
internal,
263265
summaries,
@@ -281,10 +283,15 @@ impl<T: Timestamp> InputCapability<T> {
281283
/// Delays capability for a specific output port.
282284
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
283285
use crate::progress::timestamp::PathSummary;
284-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
285-
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
286-
} else {
287-
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
286+
if let Some(path) = self.summaries.borrow().get(output_port) {
287+
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
288+
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
289+
} else {
290+
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
291+
}
292+
}
293+
else {
294+
panic!("Attempted to delay a capability for a disconnected output");
288295
}
289296
}
290297

@@ -305,11 +312,16 @@ impl<T: Timestamp> InputCapability<T> {
305312
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
306313
use crate::progress::timestamp::PathSummary;
307314
let self_time = self.time().clone();
308-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
309-
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
315+
if let Some(path) = self.summaries.borrow().get(output_port) {
316+
if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
317+
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
318+
}
319+
else {
320+
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
321+
}
310322
}
311323
else {
312-
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
324+
panic!("Attempted to retain a capability for a disconnected output");
313325
}
314326
}
315327
}

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
113113
let summary = handle.summary;
114114
let mut output = handle.output;
115115

116-
let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
116+
let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
117117

118118
builder.build(move |_capability| move |_frontier| {
119119
let mut output = output.activate();

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1211
use crate::progress::Source;
13-
12+
use crate::progress::operate::Connectivity;
1413
use crate::{Container, Data};
1514
use crate::communication::Push;
1615
use crate::dataflow::{Scope, ScopeParent, StreamCore};
@@ -205,7 +204,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
205204
fn inputs(&self) -> usize { 0 }
206205
fn outputs(&self) -> usize { 1 }
207206

208-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
207+
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
209208
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210209
(Vec::new(), Rc::clone(&self.shared_progress))
211210
}

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder};
77

88
use crate::scheduling::{Schedule, ActivateOnDrop};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
1211
use crate::progress::Source;
1312
use crate::progress::ChangeBatch;
14-
13+
use crate::progress::operate::Connectivity;
1514
use crate::dataflow::channels::pushers::{Counter, Tee};
1615
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
1716

@@ -134,7 +133,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134133
fn inputs(&self) -> usize { 0 }
135134
fn outputs(&self) -> usize { 1 }
136135

137-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
136+
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138137
let mut borrow = self.internal.borrow_mut();
139138
for (time, count) in borrow.drain() {
140139
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15-
15+
use crate::progress::operate::{Connectivity, PortConnectivity};
1616
use crate::Container;
1717
use crate::dataflow::{StreamCore, Scope};
1818
use crate::dataflow::channels::pushers::Tee;
@@ -60,7 +60,7 @@ pub struct OperatorBuilder<G: Scope> {
6060
global: usize,
6161
address: Rc<[usize]>, // path to the operator (ending with index).
6262
shape: OperatorShape,
63-
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
63+
summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
6464
}
6565

6666
impl<G: Scope> OperatorBuilder<G> {
@@ -105,48 +105,53 @@ impl<G: Scope> OperatorBuilder<G> {
105105

106106
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
107107
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
108-
where
109-
P: ParallelizationContract<G::Timestamp, C> {
110-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
108+
where
109+
P: ParallelizationContract<G::Timestamp, C>
110+
{
111+
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
111112
self.new_input_connection(stream, pact, connection)
112113
}
113114

114115
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
115-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
116+
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> P::Puller
116117
where
117-
P: ParallelizationContract<G::Timestamp, C> {
118-
118+
P: ParallelizationContract<G::Timestamp, C>,
119+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
120+
{
119121
let channel_id = self.scope.new_identifier();
120122
let logging = self.scope.logging();
121123
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
122124
let target = Target::new(self.index, self.shape.inputs);
123125
stream.connect_to(target, sender, channel_id);
124126

125127
self.shape.inputs += 1;
126-
assert_eq!(self.shape.outputs, connection.len());
127-
self.summary.push(connection);
128+
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
129+
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
130+
self.summary.push(connectivity);
128131

129132
receiver
130133
}
131134

132135
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
133136
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
134137

135-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
138+
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
136139
self.new_output_connection(connection)
137140
}
138141

139142
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
140-
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
141-
143+
pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, StreamCore<G, C>)
144+
where
145+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
146+
{
147+
let new_output = self.shape.outputs;
148+
self.shape.outputs += 1;
142149
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
143-
let source = Source::new(self.index, self.shape.outputs);
150+
let source = Source::new(self.index, new_output);
144151
let stream = StreamCore::new(source, registrar, self.scope.clone());
145152

146-
self.shape.outputs += 1;
147-
assert_eq!(self.shape.inputs, connection.len());
148-
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
149-
summary.push(entry);
153+
for (input, entry) in connection {
154+
self.summary[input].add_port(new_output, entry);
150155
}
151156

152157
(targets, stream)
@@ -188,7 +193,7 @@ where
188193
logic: L,
189194
shared_progress: Rc<RefCell<SharedProgress<T>>>,
190195
activations: Rc<RefCell<Activations>>,
191-
summary: Vec<Vec<Antichain<T::Summary>>>,
196+
summary: Connectivity<T::Summary>,
192197
}
193198

194199
impl<T, L> Schedule for OperatorCore<T, L>
@@ -213,7 +218,7 @@ where
213218
fn outputs(&self) -> usize { self.shape.outputs }
214219

215220
// announce internal topology as fully connected, and hold all default capabilities.
216-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
221+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
217222

218223
// Request the operator to be scheduled at least once.
219224
self.activations.borrow_mut().activate(&self.address[..]);

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability;
2020
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
2121
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2222
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23-
23+
use crate::progress::operate::PortConnectivity;
2424
use crate::logging::TimelyLogger as Logger;
2525

2626
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
@@ -33,7 +33,7 @@ pub struct OperatorBuilder<G: Scope> {
3333
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3434
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
3535
/// For each input, a shared list of summaries to each output.
36-
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
36+
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
3737
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3838
logging: Option<Logger>,
3939
}
@@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
6464
where
6565
P: ParallelizationContract<G::Timestamp, C> {
6666

67-
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
67+
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
6868
self.new_input_connection(stream, pact, connection)
6969
}
7070

@@ -76,25 +76,26 @@ impl<G: Scope> OperatorBuilder<G> {
7676
///
7777
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
7878
/// antichain indicating that there is no connection from the input to the output.
79-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
80-
where
81-
P: ParallelizationContract<G::Timestamp, C> {
82-
79+
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
80+
where
81+
P: ParallelizationContract<G::Timestamp, C>,
82+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
83+
{
8384
let puller = self.builder.new_input_connection(stream, pact, connection.clone());
8485

8586
let input = PullCounter::new(puller);
8687
self.frontier.push(MutableAntichain::new());
8788
self.consumed.push(Rc::clone(input.consumed()));
8889

89-
let shared_summary = Rc::new(RefCell::new(connection));
90+
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
9091
self.summaries.push(Rc::clone(&shared_summary));
9192

9293
new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
9394
}
9495

9596
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
9697
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
97-
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
98+
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
9899
self.new_output_connection(connection)
99100
}
100101

@@ -106,14 +107,14 @@ impl<G: Scope> OperatorBuilder<G> {
106107
///
107108
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
108109
/// antichain indicating that there is no connection from the input to the output.
109-
pub fn new_output_connection<CB: ContainerBuilder>(
110-
&mut self,
111-
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
112-
) -> (
110+
pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
113111
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
114112
StreamCore<G, CB::Container>
115-
) {
116-
113+
)
114+
where
115+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
116+
{
117+
let new_output = self.shape().outputs();
117118
let (tee, stream) = self.builder.new_output_connection(connection.clone());
118119

119120
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
@@ -122,8 +123,8 @@ impl<G: Scope> OperatorBuilder<G> {
122123
let mut buffer = PushBuffer::new(PushCounter::new(tee));
123124
self.produced.push(Rc::clone(buffer.inner().produced()));
124125

125-
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
126-
summary.borrow_mut().push(connection.clone());
126+
for (input, entry) in connection {
127+
self.summaries[input].borrow_mut().add_port(new_output, entry);
127128
}
128129

129130
(OutputWrapper::new(buffer, internal), stream)

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
use std::rc::Rc;
77
use std::cell::RefCell;
88

9-
use crate::progress::Antichain;
109
use crate::progress::Timestamp;
1110
use crate::progress::ChangeBatch;
1211
use crate::progress::frontier::MutableAntichain;
12+
use crate::progress::operate::PortConnectivity;
1313
use crate::dataflow::channels::pullers::Counter as PullCounter;
1414
use crate::dataflow::channels::pushers::Counter as PushCounter;
1515
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
@@ -30,7 +30,7 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
3030
///
3131
/// Each timestamp received through this input may only produce output timestamps
3232
/// greater or equal to the input timestamp subjected to at least one of these summaries.
33-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
33+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
3434
logging: Option<Logger>,
3535
}
3636

@@ -149,7 +149,7 @@ pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
149149
pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150150
pull_counter: PullCounter<T, C, P>,
151151
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
152+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
153153
logging: Option<Logger>
154154
) -> InputHandleCore<T, C, P> {
155155
InputHandleCore {

timely/src/logging.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222
use crate::Container;
2323
use crate::container::CapacityContainerBuilder;
2424
use crate::dataflow::operators::capture::{Event, EventPusher};
25+
use crate::progress::operate::Connectivity;
2526

2627
/// Logs events as a timely stream, with progress statements.
2728
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
@@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent<TS> {
7677
/// Worker-unique identifier for the operator.
7778
pub id: usize,
7879
/// Timestamp action summaries for (input, output) pairs.
79-
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
80+
pub summary: Connectivity<TS>,
8081
}
8182

8283
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]

0 commit comments

Comments
 (0)