Skip to content

Commit

Permalink
extract materialization logic for single entry into function
Browse files Browse the repository at this point in the history
Summary: `materialize_artifact_inner` is too loaded, want to lipo it and get rid of recursion

Reviewed By: samkevich

Differential Revision: D69677785

fbshipit-source-id: c19dad8a09fb75d6412d95ea1231cd990852f166
  • Loading branch information
blackm00n authored and facebook-github-bot committed Feb 17, 2025
1 parent f19acc8 commit 0d44e29
Showing 1 changed file with 61 additions and 54 deletions.
115 changes: 61 additions & 54 deletions app/buck2_execute_impl/src/materializers/deferred/command_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use crate::materializers::deferred::subscriptions::MaterializerSubscriptionOpera
use crate::materializers::deferred::subscriptions::MaterializerSubscriptions;
use crate::materializers::deferred::AccessTimesUpdates;
use crate::materializers::deferred::DeferredMaterializerStats;
use crate::materializers::deferred::MaterializeEntryError;
use crate::materializers::deferred::MaterializerReceiver;
use crate::materializers::deferred::MaterializerSender;
use crate::materializers::deferred::SharedMaterializingError;
Expand Down Expand Up @@ -1105,70 +1106,38 @@ impl<T: IoHandler> DeferredMaterializerCommandProcessor<T> {
let materialize_symlink_destination_tasks =
self.materialize_symlink_destination_tasks(&stack, &event_dispatcher, path, deps);

let materialize_entry = if let Some((entry, method)) = entry_and_method {
let io = self.io.dupe();
let path_buf = path.to_buf();
let cancellations = CancellationContext::never_cancelled(); // spawned
Either::Left(async move {
io.materialize_entry(path_buf, method, entry, event_dispatcher, cancellations)
.await
})
} else {
Either::Right(future::ready(Ok(())))
};

// Create a task to await deps and materialize ourselves
let path_buf = path.to_buf();
let path_buf_dup = path_buf.clone();
let io = self.io.dupe();
let command_sender = self.command_sender.dupe();
let task = self
.spawn(async move {
let cancellations = CancellationContext::never_cancelled(); // spawned

// Materialize the deps and this entry. This *must* happen in a try block because we
// need to notify the materializer regardless of whether this succeeds or fails.

let timestamp = Utc::now();
let res: Result<(), SharedMaterializingError> = try {
// If there is an existing future trying to delete conflicting paths, we must wait for it
// to finish before we can start materialization.
if let Some(cleaning_fut) = cleaning_fut {
cleaning_fut
.await
.with_buck_error_context(|| "Error cleaning output path")
.map_err(|e| SharedMaterializingError::Error(e.into()))?;
};

// In case this is a local copy, we first need to materialize the
// artifacts we are copying from, before we can copy them.
for t in materialize_copy_source_tasks {
t.await?;
}

let materialize = if let Some((entry, method)) = entry_and_method {
Either::Left(async move {
io.materialize_entry(
path_buf.clone(),
method,
entry.dupe(),
event_dispatcher.dupe(),
cancellations,
)
.await
})
} else {
Either::Right(future::ready(Ok(())))
};

// Windows symlinks need to be specified whether it is to a file or target. We rely on the
// target file existing to determine this. Ensure symlink targets exist before the entry
// is materialized for Windows. For non-Windows, do everything concurrently.
if cfg!(windows) {
for t in materialize_symlink_destination_tasks {
t.await?;
}
materialize.await?;
} else {
materialize.await?;
for t in materialize_symlink_destination_tasks {
t.await?;
}
}
};
// Materialize the deps and this entry. Regardless of whether this succeeds or fails we
// need to notify the materializer, so don't check the result.
let res = Self::perform_materialization(
cleaning_fut,
materialize_copy_source_tasks,
materialize_symlink_destination_tasks,
materialize_entry,
)
.await;

// Materialization finished, notify the command thread
let _ignored = command_sender.send_low_priority(
LowPriorityMaterializerCommand::MaterializationFinished {
path: path_buf_dup,
path: path_buf,
timestamp,
version,
result: res.dupe(),
Expand All @@ -1190,6 +1159,44 @@ impl<T: IoHandler> DeferredMaterializerCommandProcessor<T> {
Ok(Some(task))
}

async fn perform_materialization(
cleaning_future: Option<CleaningFuture>,
materialize_copy_source_tasks: Vec<MaterializingFuture>,
materialize_symlink_destination_tasks: Vec<MaterializingFuture>,
materialize_entry: impl Future<Output = Result<(), MaterializeEntryError>>,
) -> Result<(), SharedMaterializingError> {
// If there is an existing future trying to delete conflicting paths, we must wait for it
// to finish before we can start materialization.
if let Some(cleaning_fut) = cleaning_future {
cleaning_fut
.await
.with_buck_error_context(|| "Error cleaning output path")
.map_err(|e| SharedMaterializingError::Error(e.into()))?;
};

// In case this is a local copy, we first need to materialize the
// artifacts we are copying from, before we can copy them.
for t in materialize_copy_source_tasks {
t.await?;
}

// Windows symlinks need to be specified whether it is to a file or target. We rely on the
// target file existing to determine this. Ensure symlink targets exist before the entry
// is materialized for Windows. For non-Windows, do everything concurrently.
if cfg!(windows) {
for t in materialize_symlink_destination_tasks {
t.await?;
}
materialize_entry.await?;
} else {
materialize_entry.await?;
for t in materialize_symlink_destination_tasks {
t.await?;
}
}
Ok(())
}

fn materialize_symlink_destination_tasks(
&mut self,
stack: &MaterializeStack,
Expand Down

0 comments on commit 0d44e29

Please sign in to comment.