|
1 |
| -use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin}; |
| 1 | +use std::{ |
| 2 | + any::{Any, TypeId}, |
| 3 | + borrow::Cow, |
| 4 | + future::Future, |
| 5 | + mem::replace, |
| 6 | + panic, |
| 7 | + pin::Pin, |
| 8 | + sync::Arc, |
| 9 | +}; |
2 | 10 |
|
3 | 11 | use anyhow::{anyhow, Result};
|
4 | 12 | use auto_hash_map::AutoSet;
|
5 | 13 | use futures::{StreamExt, TryStreamExt};
|
6 | 14 | use parking_lot::Mutex;
|
7 |
| -use rustc_hash::FxHashSet; |
| 15 | +use rustc_hash::{FxHashMap, FxHashSet}; |
| 16 | +use tokio::task_local; |
8 | 17 | use tracing::{Instrument, Span};
|
9 | 18 |
|
10 | 19 | use crate::{
|
@@ -90,10 +99,10 @@ impl EffectInstance {
|
90 | 99 | listener.await;
|
91 | 100 | }
|
92 | 101 | State::NotStarted(EffectInner { future }) => {
|
93 |
| - let join_handle = tokio::spawn( |
| 102 | + let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope( |
94 | 103 | turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future)
|
95 | 104 | .instrument(Span::current()),
|
96 |
| - ); |
| 105 | + )); |
97 | 106 | let result = match join_handle.await {
|
98 | 107 | Ok(Err(err)) => Err(SharedError::new(err)),
|
99 | 108 | Err(err) => {
|
@@ -170,20 +179,22 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
|
170 | 179 | return Ok(());
|
171 | 180 | }
|
172 | 181 | let span = tracing::info_span!("apply effects", count = effects.len());
|
173 |
| - async move { |
174 |
| - // Limit the concurrency of effects |
175 |
| - futures::stream::iter(effects) |
176 |
| - .map(Ok) |
177 |
| - .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { |
178 |
| - let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) else { |
179 |
| - panic!("Effect must only be implemented by EffectInstance"); |
180 |
| - }; |
181 |
| - effect.await?.apply().await |
182 |
| - }) |
183 |
| - .await |
184 |
| - } |
185 |
| - .instrument(span) |
186 |
| - .await |
| 182 | + APPLY_EFFECTS_CONTEXT |
| 183 | + .scope(Default::default(), async move { |
| 184 | + // Limit the concurrency of effects |
| 185 | + futures::stream::iter(effects) |
| 186 | + .map(Ok) |
| 187 | + .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { |
| 188 | + let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) |
| 189 | + else { |
| 190 | + panic!("Effect must only be implemented by EffectInstance"); |
| 191 | + }; |
| 192 | + effect.await?.apply().await |
| 193 | + }) |
| 194 | + .await |
| 195 | + }) |
| 196 | + .instrument(span) |
| 197 | + .await |
187 | 198 | }
|
188 | 199 |
|
189 | 200 | /// Capture effects from an turbo-tasks operation. Since this captures collectibles it might
|
@@ -252,17 +263,81 @@ impl Effects {
|
252 | 263 | /// Applies all effects that have been captured by this struct.
|
253 | 264 | pub async fn apply(&self) -> Result<()> {
|
254 | 265 | let span = tracing::info_span!("apply effects", count = self.effects.len());
|
255 |
| - async move { |
256 |
| - // Limit the concurrency of effects |
257 |
| - futures::stream::iter(self.effects.iter()) |
258 |
| - .map(Ok) |
259 |
| - .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { |
260 |
| - effect.apply().await |
| 266 | + APPLY_EFFECTS_CONTEXT |
| 267 | + .scope(Default::default(), async move { |
| 268 | + // Limit the concurrency of effects |
| 269 | + futures::stream::iter(self.effects.iter()) |
| 270 | + .map(Ok) |
| 271 | + .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| { |
| 272 | + effect.apply().await |
| 273 | + }) |
| 274 | + .await |
| 275 | + }) |
| 276 | + .instrument(span) |
| 277 | + .await |
| 278 | + } |
| 279 | +} |
| 280 | + |
| 281 | +task_local! { |
| 282 | + /// The context of the current effects application. |
| 283 | + static APPLY_EFFECTS_CONTEXT: Arc<Mutex<ApplyEffectsContext>>; |
| 284 | +} |
| 285 | + |
| 286 | +#[derive(Default)] |
| 287 | +pub struct ApplyEffectsContext { |
| 288 | + data: FxHashMap<TypeId, Box<dyn Any + Send + Sync>>, |
| 289 | +} |
| 290 | + |
| 291 | +impl ApplyEffectsContext { |
| 292 | + fn in_current_scope<F: Future>(f: F) -> impl Future<Output = F::Output> { |
| 293 | + let current = Self::current(); |
| 294 | + APPLY_EFFECTS_CONTEXT.scope(current, f) |
| 295 | + } |
| 296 | + |
| 297 | + fn current() -> Arc<Mutex<Self>> { |
| 298 | + APPLY_EFFECTS_CONTEXT |
| 299 | + .try_with(|mutex| mutex.clone()) |
| 300 | + .expect("No effect context found") |
| 301 | + } |
| 302 | + |
| 303 | + fn with_context<T, F: FnOnce(&mut Self) -> T>(f: F) -> T { |
| 304 | + APPLY_EFFECTS_CONTEXT |
| 305 | + .try_with(|mutex| f(&mut mutex.lock())) |
| 306 | + .expect("No effect context found") |
| 307 | + } |
| 308 | + |
| 309 | + pub fn set<T: Any + Send + Sync>(value: T) { |
| 310 | + Self::with_context(|this| { |
| 311 | + this.data.insert(TypeId::of::<T>(), Box::new(value)); |
| 312 | + }) |
| 313 | + } |
| 314 | + |
| 315 | + pub fn with<T: Any + Send + Sync, R>(f: impl FnOnce(&mut T) -> R) -> Option<R> { |
| 316 | + Self::with_context(|this| { |
| 317 | + this.data |
| 318 | + .get_mut(&TypeId::of::<T>()) |
| 319 | + .map(|value| { |
| 320 | + // Safety: the map is keyed by TypeId |
| 321 | + unsafe { value.downcast_mut_unchecked() } |
261 | 322 | })
|
262 |
| - .await |
263 |
| - } |
264 |
| - .instrument(span) |
265 |
| - .await |
| 323 | + .map(f) |
| 324 | + }) |
| 325 | + } |
| 326 | + |
| 327 | + pub fn with_or_insert_with<T: Any + Send + Sync, R>( |
| 328 | + insert_with: impl FnOnce() -> T, |
| 329 | + f: impl FnOnce(&mut T) -> R, |
| 330 | + ) -> R { |
| 331 | + Self::with_context(|this| { |
| 332 | + let value = this.data.entry(TypeId::of::<T>()).or_insert_with(|| { |
| 333 | + let value = insert_with(); |
| 334 | + Box::new(value) |
| 335 | + }); |
| 336 | + f( |
| 337 | + // Safety: the map is keyed by TypeId |
| 338 | + unsafe { value.downcast_mut_unchecked() }, |
| 339 | + ) |
| 340 | + }) |
266 | 341 | }
|
267 | 342 | }
|
268 | 343 |
|
|
0 commit comments