From 0d44e29fd20c837e48c05897f63d81dbe5476f89 Mon Sep 17 00:00:00 2001 From: Alexey Kozhevnikov Date: Mon, 17 Feb 2025 11:45:02 -0800 Subject: [PATCH] extract materialization logic for single entry into function Summary: `materialize_artifact_inner` is too loaded, want to lipo it and get rid of recursion Reviewed By: samkevich Differential Revision: D69677785 fbshipit-source-id: c19dad8a09fb75d6412d95ea1231cd990852f166 --- .../deferred/command_processor.rs | 115 ++++++++++-------- 1 file changed, 61 insertions(+), 54 deletions(-) diff --git a/app/buck2_execute_impl/src/materializers/deferred/command_processor.rs b/app/buck2_execute_impl/src/materializers/deferred/command_processor.rs index 4139edd090637..e880d077a1f34 100644 --- a/app/buck2_execute_impl/src/materializers/deferred/command_processor.rs +++ b/app/buck2_execute_impl/src/materializers/deferred/command_processor.rs @@ -80,6 +80,7 @@ use crate::materializers::deferred::subscriptions::MaterializerSubscriptionOpera use crate::materializers::deferred::subscriptions::MaterializerSubscriptions; use crate::materializers::deferred::AccessTimesUpdates; use crate::materializers::deferred::DeferredMaterializerStats; +use crate::materializers::deferred::MaterializeEntryError; use crate::materializers::deferred::MaterializerReceiver; use crate::materializers::deferred::MaterializerSender; use crate::materializers::deferred::SharedMaterializingError; @@ -1105,70 +1106,38 @@ impl DeferredMaterializerCommandProcessor { let materialize_symlink_destination_tasks = self.materialize_symlink_destination_tasks(&stack, &event_dispatcher, path, deps); + let materialize_entry = if let Some((entry, method)) = entry_and_method { + let io = self.io.dupe(); + let path_buf = path.to_buf(); + let cancellations = CancellationContext::never_cancelled(); // spawned + Either::Left(async move { + io.materialize_entry(path_buf, method, entry, event_dispatcher, cancellations) + .await + }) + } else { + Either::Right(future::ready(Ok(()))) + }; + // Create a task to await deps and materialize ourselves let path_buf = path.to_buf(); - let path_buf_dup = path_buf.clone(); - let io = self.io.dupe(); let command_sender = self.command_sender.dupe(); let task = self .spawn(async move { - let cancellations = CancellationContext::never_cancelled(); // spawned - - // Materialize the deps and this entry. This *must* happen in a try block because we - // need to notify the materializer regardless of whether this succeeds or fails. - let timestamp = Utc::now(); - let res: Result<(), SharedMaterializingError> = try { - // If there is an existing future trying to delete conflicting paths, we must wait for it - // to finish before we can start materialization. - if let Some(cleaning_fut) = cleaning_fut { - cleaning_fut - .await - .with_buck_error_context(|| "Error cleaning output path") - .map_err(|e| SharedMaterializingError::Error(e.into()))?; - }; - - // In case this is a local copy, we first need to materialize the - // artifacts we are copying from, before we can copy them. - for t in materialize_copy_source_tasks { - t.await?; - } - - let materialize = if let Some((entry, method)) = entry_and_method { - Either::Left(async move { - io.materialize_entry( - path_buf.clone(), - method, - entry.dupe(), - event_dispatcher.dupe(), - cancellations, - ) - .await - }) - } else { - Either::Right(future::ready(Ok(()))) - }; - - // Windows symlinks need to be specified whether it is to a file or target. We rely on the - // target file existing to determine this. Ensure symlink targets exist before the entry - // is materialized for Windows. For non-Windows, do everything concurrently. - if cfg!(windows) { - for t in materialize_symlink_destination_tasks { - t.await?; - } - materialize.await?; - } else { - materialize.await?; - for t in materialize_symlink_destination_tasks { - t.await?; - } - } - }; + // Materialize the deps and this entry. Regardless of whether this succeeds or fails we + // need to notify the materializer, so don't check the result. + let res = Self::perform_materialization( + cleaning_fut, + materialize_copy_source_tasks, + materialize_symlink_destination_tasks, + materialize_entry, + ) + .await; // Materialization finished, notify the command thread let _ignored = command_sender.send_low_priority( LowPriorityMaterializerCommand::MaterializationFinished { - path: path_buf_dup, + path: path_buf, timestamp, version, result: res.dupe(), @@ -1190,6 +1159,44 @@ impl DeferredMaterializerCommandProcessor { Ok(Some(task)) } + async fn perform_materialization( + cleaning_future: Option, + materialize_copy_source_tasks: Vec, + materialize_symlink_destination_tasks: Vec, + materialize_entry: impl Future>, + ) -> Result<(), SharedMaterializingError> { + // If there is an existing future trying to delete conflicting paths, we must wait for it + // to finish before we can start materialization. + if let Some(cleaning_fut) = cleaning_future { + cleaning_fut + .await + .with_buck_error_context(|| "Error cleaning output path") + .map_err(|e| SharedMaterializingError::Error(e.into()))?; + }; + + // In case this is a local copy, we first need to materialize the + // artifacts we are copying from, before we can copy them. + for t in materialize_copy_source_tasks { + t.await?; + } + + // Windows symlinks need to be specified whether it is to a file or target. We rely on the + // target file existing to determine this. Ensure symlink targets exist before the entry + // is materialized for Windows. For non-Windows, do everything concurrently. + if cfg!(windows) { + for t in materialize_symlink_destination_tasks { + t.await?; + } + materialize_entry.await?; + } else { + materialize_entry.await?; + for t in materialize_symlink_destination_tasks { + t.await?; + } + } + Ok(()) + } + fn materialize_symlink_destination_tasks( &mut self, stack: &MaterializeStack,