Skip to content

Commit 576e04c

Browse files
authored
[Turbopack] wait for output instead of waiting for task completion (#75438)
### What? Allow to continue follow-up tasks even if a task is not completely finished. It can do task completion work in background. This enables more concurrency, especially since all task connecting is at the end of the task now.
1 parent b46ffa9 commit 576e04c

File tree

1 file changed

+42
-19
lines changed
  • turbopack/crates/turbo-tasks-backend/src/backend

1 file changed

+42
-19
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

+42-19
Original file line numberDiff line numberDiff line change
@@ -412,31 +412,41 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
412412
let mut ctx = self.execute_context(turbo_tasks);
413413
let mut task = ctx.task(task_id, TaskDataCategory::All);
414414

415+
fn listen_to_done_event<B: BackingStorage>(
416+
this: &TurboTasksBackendInner<B>,
417+
reader: Option<TaskId>,
418+
done_event: &Event,
419+
) -> EventListener {
420+
let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
421+
let listener = done_event.listen_with_note(move || {
422+
if let Some(reader_desc) = reader_desc.as_ref() {
423+
format!("try_read_task_output from {}", reader_desc())
424+
} else {
425+
"try_read_task_output (untracked)".to_string()
426+
}
427+
});
428+
listener
429+
}
430+
415431
fn check_in_progress<B: BackingStorage>(
416432
this: &TurboTasksBackendInner<B>,
417433
task: &impl TaskGuard,
418434
reader: Option<TaskId>,
419435
) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
420436
{
421-
if let Some(in_progress) = get!(task, InProgress) {
422-
match in_progress {
423-
InProgressState::Scheduled { done_event, .. }
424-
| InProgressState::InProgress(box InProgressStateInner {
425-
done_event, ..
426-
}) => {
427-
let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
428-
let listener = done_event.listen_with_note(move || {
429-
if let Some(reader_desc) = reader_desc.as_ref() {
430-
format!("try_read_task_output from {}", reader_desc())
431-
} else {
432-
"try_read_task_output (untracked)".to_string()
433-
}
434-
});
435-
return Some(Ok(Err(listener)));
436-
}
437+
match get!(task, InProgress) {
438+
Some(InProgressState::Scheduled { done_event, .. }) => {
439+
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
440+
}
441+
Some(InProgressState::InProgress(box InProgressStateInner {
442+
marked_as_completed,
443+
done_event,
444+
..
445+
})) if !*marked_as_completed => {
446+
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
437447
}
448+
_ => None,
438449
}
439-
None
440450
}
441451

442452
if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
@@ -1162,6 +1172,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
11621172
};
11631173
let &mut InProgressState::InProgress(box InProgressStateInner {
11641174
stale,
1175+
ref mut marked_as_completed,
1176+
ref done_event,
11651177
ref mut new_children,
11661178
..
11671179
}) = in_progress
@@ -1194,6 +1206,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
11941206
);
11951207
return true;
11961208
}
1209+
1210+
// mark the task as completed, so dependent tasks can continue working
1211+
if !*marked_as_completed {
1212+
*marked_as_completed = true;
1213+
done_event.notify(usize::MAX);
1214+
}
1215+
1216+
// take the children from the task to process them
11971217
let mut new_children = take(new_children);
11981218

11991219
// TODO handle stateful
@@ -1452,8 +1472,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
14521472

14531473
drop(task);
14541474

1455-
done_event.notify(usize::MAX);
1456-
14571475
if let Some(data_update) = data_update {
14581476
AggregationUpdateQueue::run(data_update, &mut ctx);
14591477
}
@@ -1740,11 +1758,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
17401758
let mut task = ctx.task(task, TaskDataCategory::Data);
17411759
if let Some(InProgressState::InProgress(box InProgressStateInner {
17421760
marked_as_completed,
1761+
done_event,
17431762
..
17441763
})) = get_mut!(task, InProgress)
17451764
{
17461765
*marked_as_completed = true;
1766+
done_event.notify(usize::MAX);
17471767
// TODO this should remove the dirty state (also check session_dependent)
1768+
// but this would break some assumptions for strongly consistent reads.
1769+
// Client tasks are not connected yet, so we wouldn't wait for them.
1770+
// Maybe that's ok in cases where mark_finished() is used? Seems like it?
17481771
}
17491772
}
17501773

0 commit comments

Comments
 (0)