Skip to content

Commit a92df78

Browse files
committed
Log frontiers in progress tracking
1 parent 9705303 commit a92df78

File tree

1 file changed

+144
-42
lines changed

1 file changed

+144
-42
lines changed

timely/src/progress/reachability.rs

Lines changed: 144 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ impl<T:Timestamp> Tracker<T> {
573573
.collect::<Vec<_>>();
574574

575575
if !target_changes.is_empty() {
576-
logger.log_target_updates(Box::new(target_changes));
576+
logger.log_target_pointstamp_updates(Box::new(target_changes));
577577
}
578578

579579
let source_changes =
@@ -583,7 +583,7 @@ impl<T:Timestamp> Tracker<T> {
583583
.collect::<Vec<_>>();
584584

585585
if !source_changes.is_empty() {
586-
logger.log_source_updates(Box::new(source_changes));
586+
logger.log_source_pointstamp_updates(Box::new(source_changes));
587587
}
588588
}
589589

@@ -670,6 +670,7 @@ impl<T:Timestamp> Tracker<T> {
670670
}
671671
self.pushed_changes.update((location, time), diff);
672672
}
673+
673674
}
674675
// Update to an operator output.
675676
// Propagate any changes forward along outgoing edges.
@@ -696,6 +697,30 @@ impl<T:Timestamp> Tracker<T> {
696697
}
697698
}
698699

700+
// Step 3: If logging is enabled, construct and log outbound changes.
701+
if let Some(logger) = &mut self.logger {
702+
let mut target_changes = Vec::new();
703+
let mut source_changes = Vec::new();
704+
705+
for ((location, time), diff) in self.pushed_changes.iter() {
706+
match location.port {
707+
Port::Target(port) => {
708+
target_changes.push((location.node, port, time.clone(), *diff))
709+
}
710+
Port::Source(port) => {
711+
source_changes.push((location.node, port, time.clone(), *diff))
712+
}
713+
}
714+
}
715+
716+
if !target_changes.is_empty() || !source_changes.is_empty() {
717+
logger.log_frontier_updates(
718+
Box::new(target_changes),
719+
Box::new(source_changes),
720+
);
721+
}
722+
}
723+
699724
self.pushed_changes.drain()
700725
}
701726

@@ -839,56 +864,113 @@ pub mod logging {
839864
Self { path, logger }
840865
}
841866

842-
/// Log source update events with additional identifying information.
843-
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
867+
/// Log source pointstamp update events with additional identifying information.
868+
pub fn log_source_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
844869
self.logger.log({
845-
SourceUpdate {
870+
SourcePointstampUpdate {
846871
tracker_id: self.path.clone(),
847872
updates,
848873
}
849874
})
850875
}
851-
/// Log target update events with additional identifying information.
852-
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
876+
/// Log target pointstamp update events with additional identifying information.
877+
pub fn log_target_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
853878
self.logger.log({
854-
TargetUpdate {
879+
TargetPointstampUpdate {
855880
tracker_id: self.path.clone(),
856881
updates,
857882
}
858883
})
859884
}
885+
886+
/// Log frontier update events with additional identifying information.
887+
///
888+
/// We want to log source and target updates at the same time to ensure callers observe
889+
/// consistent frontiers at any point in time.
890+
pub fn log_frontier_updates(
891+
&mut self,
892+
source_updates: Box<dyn ProgressEventTimestampVec>,
893+
target_updates: Box<dyn ProgressEventTimestampVec>,
894+
) {
895+
let source_event: TrackerEvent = SourceFrontierUpdate {
896+
tracker_id: self.path.clone(),
897+
updates: source_updates,
898+
}.into();
899+
let target_event: TrackerEvent = TargetFrontierUpdate {
900+
tracker_id: self.path.clone(),
901+
updates: target_updates,
902+
}.into();
903+
904+
self.logger.log_many([source_event, target_event]);
905+
}
860906
}
861907

862908
/// Events that the tracker may record.
863909
pub enum TrackerEvent {
864-
/// Updates made at a source of data.
865-
SourceUpdate(SourceUpdate),
866-
/// Updates made at a target of data.
867-
TargetUpdate(TargetUpdate),
910+
/// Pointstamp updates made at a source of data.
911+
SourcePointstampUpdate(SourcePointstampUpdate),
912+
/// Pointstamp updates made at a target of data.
913+
TargetPointstampUpdate(TargetPointstampUpdate),
914+
/// Frontier updates made at a source of data.
915+
SourceFrontierUpdate(SourceFrontierUpdate),
916+
/// Frontier updates made at a target of data.
917+
TargetFrontierUpdate(TargetFrontierUpdate),
918+
}
919+
920+
/// A pointstamp update made at a source of data.
921+
pub struct SourcePointstampUpdate {
922+
/// An identifier for the tracker.
923+
pub tracker_id: Vec<usize>,
924+
/// Updates themselves, as `(node, port, time, diff)`.
925+
pub updates: Box<dyn ProgressEventTimestampVec>,
926+
}
927+
928+
/// A pointstamp update made at a target of data.
929+
pub struct TargetPointstampUpdate {
930+
/// An identifier for the tracker.
931+
pub tracker_id: Vec<usize>,
932+
/// Updates themselves, as `(node, port, time, diff)`.
933+
pub updates: Box<dyn ProgressEventTimestampVec>,
868934
}
869935

870-
/// An update made at a source of data.
871-
pub struct SourceUpdate {
936+
/// A frontier update at a source of data.
937+
pub struct SourceFrontierUpdate {
872938
/// An identifier for the tracker.
873939
pub tracker_id: Vec<usize>,
874940
/// Updates themselves, as `(node, port, time, diff)`.
875941
pub updates: Box<dyn ProgressEventTimestampVec>,
876942
}
877943

878-
/// An update made at a target of data.
879-
pub struct TargetUpdate {
944+
/// A frontier update at a target of data.
945+
pub struct TargetFrontierUpdate {
880946
/// An identifier for the tracker.
881947
pub tracker_id: Vec<usize>,
882948
/// Updates themselves, as `(node, port, time, diff)`.
883949
pub updates: Box<dyn ProgressEventTimestampVec>,
884950
}
885951

886-
impl From<SourceUpdate> for TrackerEvent {
887-
fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) }
952+
impl From<SourcePointstampUpdate> for TrackerEvent {
953+
fn from(v: SourcePointstampUpdate) -> Self {
954+
Self::SourcePointstampUpdate(v)
955+
}
956+
}
957+
958+
impl From<TargetPointstampUpdate> for TrackerEvent {
959+
fn from(v: TargetPointstampUpdate) -> Self {
960+
Self::TargetPointstampUpdate(v)
961+
}
962+
}
963+
964+
impl From<SourceFrontierUpdate> for TrackerEvent {
965+
fn from(v: SourceFrontierUpdate) -> Self {
966+
Self::SourceFrontierUpdate(v)
967+
}
888968
}
889969

890-
impl From<TargetUpdate> for TrackerEvent {
891-
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
970+
impl From<TargetFrontierUpdate> for TrackerEvent {
971+
fn from(v: TargetFrontierUpdate) -> Self {
972+
Self::TargetFrontierUpdate(v)
973+
}
892974
}
893975
}
894976

@@ -906,32 +988,52 @@ impl<T: Timestamp> Drop for Tracker<T> {
906988
};
907989

908990
// Retract pending data that `propagate_all` would normally log.
991+
let mut target_pointstamp_changes = Vec::new();
992+
let mut source_pointstamp_changes = Vec::new();
993+
let mut target_frontier_changes = Vec::new();
994+
let mut source_frontier_changes = Vec::new();
995+
909996
for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
910-
let target_changes = per_operator.targets
911-
.iter_mut()
912-
.enumerate()
913-
.flat_map(|(port, target)| {
914-
target.pointstamps
915-
.updates()
916-
.map(move |(time, diff)| (index, port, time.clone(), -diff))
917-
})
918-
.collect::<Vec<_>>();
919-
if !target_changes.is_empty() {
920-
logger.log_target_updates(Box::new(target_changes));
997+
for (port, target) in per_operator.targets.iter_mut().enumerate() {
998+
let pointstamp_retractions = target.pointstamps
999+
.updates()
1000+
.map(|(time, diff)| (index, port, time.clone(), -diff));
1001+
target_pointstamp_changes.extend(pointstamp_retractions);
1002+
1003+
let frontier = target.implications.frontier().to_owned();
1004+
let frontier_retractions = frontier
1005+
.into_iter()
1006+
.map(|time| (index, port, time, -1));
1007+
target_frontier_changes.extend(frontier_retractions);
9211008
}
1009+
}
9221010

923-
let source_changes = per_operator.sources
924-
.iter_mut()
925-
.enumerate()
926-
.flat_map(|(port, source)| {
927-
source.pointstamps
928-
.updates()
929-
.map(move |(time, diff)| (index, port, time.clone(), -diff))
930-
})
931-
.collect::<Vec<_>>();
932-
if !source_changes.is_empty() {
933-
logger.log_source_updates(Box::new(source_changes));
1011+
for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
1012+
for (port, source) in per_operator.sources.iter_mut().enumerate() {
1013+
let pointstamp_retractions = source.pointstamps
1014+
.updates()
1015+
.map(|(time, diff)| (index, port, time.clone(), -diff));
1016+
source_pointstamp_changes.extend(pointstamp_retractions);
1017+
1018+
let frontier = source.implications.frontier().to_owned();
1019+
let frontier_retractions = frontier
1020+
.into_iter()
1021+
.map(|time| (index, port, time, -1));
1022+
source_frontier_changes.extend(frontier_retractions);
9341023
}
9351024
}
1025+
1026+
if !target_pointstamp_changes.is_empty() {
1027+
logger.log_target_pointstamp_updates(Box::new(target_pointstamp_changes));
1028+
}
1029+
if !source_pointstamp_changes.is_empty() {
1030+
logger.log_source_pointstamp_updates(Box::new(source_pointstamp_changes));
1031+
}
1032+
if !source_frontier_changes.is_empty() || !target_frontier_changes.is_empty() {
1033+
logger.log_frontier_updates(
1034+
Box::new(source_frontier_changes),
1035+
Box::new(target_frontier_changes),
1036+
);
1037+
}
9361038
}
9371039
}

0 commit comments

Comments
 (0)