Skip to content

Commit 02f10d9

Browse files
committed
Remove the use of Rayon iterators
1 parent 69b3959 commit 02f10d9

File tree

14 files changed

+122
-74
lines changed

14 files changed

+122
-74
lines changed

Cargo.lock

+1-13
Original file line numberDiff line numberDiff line change
@@ -3216,17 +3216,6 @@ dependencies = [
32163216
"tikv-jemalloc-sys",
32173217
]
32183218

3219-
[[package]]
3220-
name = "rustc-rayon"
3221-
version = "0.5.1"
3222-
source = "registry+https://github.com/rust-lang/crates.io-index"
3223-
checksum = "2cd9fb077db982d7ceb42a90471e5a69a990b58f71e06f0d8340bb2cf35eb751"
3224-
dependencies = [
3225-
"either",
3226-
"indexmap",
3227-
"rustc-rayon-core",
3228-
]
3229-
32303219
[[package]]
32313220
name = "rustc-rayon-core"
32323221
version = "0.5.0"
@@ -3598,7 +3587,7 @@ dependencies = [
35983587
"parking_lot",
35993588
"portable-atomic",
36003589
"rustc-hash 2.1.1",
3601-
"rustc-rayon",
3590+
"rustc-rayon-core",
36023591
"rustc-stable-hash",
36033592
"rustc_arena",
36043593
"rustc_graphviz",
@@ -3944,7 +3933,6 @@ dependencies = [
39443933
name = "rustc_interface"
39453934
version = "0.0.0"
39463935
dependencies = [
3947-
"rustc-rayon",
39483936
"rustc-rayon-core",
39493937
"rustc_abi",
39503938
"rustc_ast",

compiler/rustc_codegen_cranelift/src/driver/aot.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -728,26 +728,27 @@ pub(crate) fn run_aot(
728728

729729
let concurrency_limiter = IntoDynSyncSend(ConcurrencyLimiter::new(todo_cgus.len()));
730730

731-
let modules = tcx.sess.time("codegen mono items", || {
732-
let mut modules: Vec<_> = par_map(todo_cgus, |(_, cgu)| {
733-
let dep_node = cgu.codegen_dep_node(tcx);
734-
tcx.dep_graph
735-
.with_task(
731+
let modules: Vec<_> =
732+
tcx.sess.time("codegen mono items", || {
733+
let modules: Vec<_> = par_map(todo_cgus, |(_, cgu)| {
734+
let dep_node = cgu.codegen_dep_node(tcx);
735+
let (module, _) = tcx.dep_graph.with_task(
736736
dep_node,
737737
tcx,
738738
(global_asm_config.clone(), cgu.name(), concurrency_limiter.acquire(tcx.dcx())),
739739
module_codegen,
740740
Some(rustc_middle::dep_graph::hash_result),
741-
)
742-
.0
743-
});
744-
modules.extend(
745-
done_cgus
741+
);
742+
IntoDynSyncSend(module)
743+
});
744+
modules
746745
.into_iter()
747-
.map(|(_, cgu)| OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, cgu))),
748-
);
749-
modules
750-
});
746+
.map(|module| module.0)
747+
.chain(done_cgus.into_iter().map(|(_, cgu)| {
748+
OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, cgu))
749+
}))
750+
.collect()
751+
});
751752

752753
let allocator_module = emit_allocator_module(tcx);
753754

compiler/rustc_codegen_ssa/src/base.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use rustc_ast::expand::allocator::{ALLOCATOR_METHODS, AllocatorKind, global_fn_n
1010
use rustc_attr_parsing::OptimizeAttr;
1111
use rustc_data_structures::fx::{FxHashMap, FxIndexSet};
1212
use rustc_data_structures::profiling::{get_resident_set_size, print_time_passes_entry};
13-
use rustc_data_structures::sync::par_map;
13+
use rustc_data_structures::sync::{IntoDynSyncSend, par_map};
1414
use rustc_data_structures::unord::UnordMap;
1515
use rustc_hir::def_id::{DefId, LOCAL_CRATE};
1616
use rustc_hir::lang_items::LangItem;
@@ -757,7 +757,7 @@ pub fn codegen_crate<B: ExtraBackendMethods>(
757757

758758
let pre_compiled_cgus = par_map(cgus, |(i, _)| {
759759
let module = backend.compile_codegen_unit(tcx, codegen_units[i].name());
760-
(i, module)
760+
(i, IntoDynSyncSend(module))
761761
});
762762

763763
total_codegen_time += start_time.elapsed();
@@ -777,7 +777,7 @@ pub fn codegen_crate<B: ExtraBackendMethods>(
777777
match cgu_reuse {
778778
CguReuse::No => {
779779
let (module, cost) = if let Some(cgu) = pre_compiled_cgus.remove(&i) {
780-
cgu
780+
cgu.0
781781
} else {
782782
let start_time = Instant::now();
783783
let module = backend.compile_codegen_unit(tcx, cgu.name());

compiler/rustc_data_structures/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ indexmap = "2.4.0"
1414
jobserver_crate = { version = "0.1.28", package = "jobserver" }
1515
measureme = "12.0.1"
1616
rustc-hash = "2.0.0"
17-
rustc-rayon = { version = "0.5.1", features = ["indexmap"] }
17+
rustc-rayon-core = { version = "0.5.0" }
1818
rustc-stable-hash = { version = "0.1.0", features = ["nightly"] }
1919
rustc_arena = { path = "../rustc_arena" }
2020
rustc_graphviz = { path = "../rustc_graphviz" }

compiler/rustc_data_structures/src/marker.rs

+13
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ impl<T> FromDyn<T> {
179179
FromDyn(val)
180180
}
181181

182+
#[inline(always)]
183+
pub fn derive<O>(&self, val: O) -> FromDyn<O> {
184+
// We already did the check for `sync::is_dyn_thread_safe()` when creating `Self`
185+
FromDyn(val)
186+
}
187+
182188
#[inline(always)]
183189
pub fn into_inner(self) -> T {
184190
self.0
@@ -200,6 +206,13 @@ impl<T> std::ops::Deref for FromDyn<T> {
200206
}
201207
}
202208

209+
impl<T> std::ops::DerefMut for FromDyn<T> {
210+
#[inline(always)]
211+
fn deref_mut(&mut self) -> &mut Self::Target {
212+
&mut self.0
213+
}
214+
}
215+
203216
// A wrapper to convert a struct that is already a `Send` or `Sync` into
204217
// an instance of `DynSend` and `DynSync`, since the compiler cannot infer
205218
// it automatically in some cases. (e.g. Box<dyn Send / Sync>)

compiler/rustc_data_structures/src/sync/parallel.rs

+75-29
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::any::Any;
77
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
88

99
use parking_lot::Mutex;
10-
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
1110

1211
use crate::FatalErrorMarker;
1312
use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
@@ -97,11 +96,11 @@ macro_rules! parallel {
9796
// This function only works when `mode::is_dyn_thread_safe()`.
9897
pub fn scope<'scope, OP, R>(op: OP) -> R
9998
where
100-
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
99+
OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
101100
R: DynSend,
102101
{
103102
let op = FromDyn::from(op);
104-
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
103+
rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
105104
}
106105

107106
#[inline]
@@ -114,7 +113,7 @@ where
114113
let oper_a = FromDyn::from(oper_a);
115114
let oper_b = FromDyn::from(oper_b);
116115
let (a, b) = parallel_guard(|guard| {
117-
rayon::join(
116+
rayon_core::join(
118117
move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
119118
move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
120119
)
@@ -125,56 +124,103 @@ where
125124
}
126125
}
127126

128-
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
127+
fn par_slice<I: DynSend>(
128+
items: &mut [I],
129+
guard: &ParallelGuard,
130+
for_each: impl Fn(&mut I) + DynSync + DynSend,
131+
) {
132+
struct State<'a, F> {
133+
for_each: FromDyn<F>,
134+
guard: &'a ParallelGuard,
135+
group: usize,
136+
}
137+
138+
fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
139+
items: &mut [I],
140+
state: &State<'_, F>,
141+
) {
142+
if items.len() <= state.group {
143+
for item in items {
144+
state.guard.run(|| (state.for_each)(item));
145+
}
146+
} else {
147+
let (left, right) = items.split_at_mut(items.len() / 2);
148+
let mut left = state.for_each.derive(left);
149+
let mut right = state.for_each.derive(right);
150+
rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
151+
}
152+
}
153+
154+
let state = State {
155+
for_each: FromDyn::from(for_each),
156+
guard,
157+
group: std::cmp::max(items.len() / 128, 1),
158+
};
159+
par_rec(items, &state)
160+
}
161+
162+
pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
129163
t: T,
130-
for_each: impl Fn(I) + DynSync + DynSend,
164+
for_each: impl Fn(&I) + DynSync + DynSend,
131165
) {
132166
parallel_guard(|guard| {
133167
if mode::is_dyn_thread_safe() {
134-
let for_each = FromDyn::from(for_each);
135-
t.into_par_iter().for_each(|i| {
136-
guard.run(|| for_each(i));
137-
});
168+
let mut items: Vec<_> = t.into_iter().collect();
169+
par_slice(&mut items, guard, |i| for_each(&*i))
138170
} else {
139171
t.into_iter().for_each(|i| {
140-
guard.run(|| for_each(i));
172+
guard.run(|| for_each(&i));
141173
});
142174
}
143175
});
144176
}
145177

146-
pub fn try_par_for_each_in<
147-
T: IntoIterator + IntoParallelIterator<Item = <T as IntoIterator>::Item>,
148-
E: Send,
149-
>(
178+
/// This runs `for_each` in parallel for each iterator item. If one or more of the
179+
/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned
180+
/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which
181+
/// are all equivalent.
182+
pub fn try_par_for_each_in<T: IntoIterator, E: DynSend>(
150183
t: T,
151-
for_each: impl Fn(<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
152-
) -> Result<(), E> {
184+
for_each: impl Fn(&<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
185+
) -> Result<(), E>
186+
where
187+
<T as IntoIterator>::Item: DynSend,
188+
{
153189
parallel_guard(|guard| {
154190
if mode::is_dyn_thread_safe() {
155-
let for_each = FromDyn::from(for_each);
156-
t.into_par_iter()
157-
.filter_map(|i| guard.run(|| for_each(i)))
158-
.reduce(|| Ok(()), Result::and)
191+
let mut items: Vec<_> = t.into_iter().collect();
192+
193+
let error = Mutex::new(None);
194+
195+
par_slice(&mut items, guard, |i| {
196+
if let Err(err) = for_each(&*i) {
197+
*error.lock() = Some(err);
198+
}
199+
});
200+
201+
if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) }
159202
} else {
160-
t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and)
203+
t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and)
161204
}
162205
})
163206
}
164207

165-
pub fn par_map<
166-
I,
167-
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
168-
R: std::marker::Send,
169-
C: FromIterator<R> + FromParallelIterator<R>,
170-
>(
208+
pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterator<R>>(
171209
t: T,
172210
map: impl Fn(I) -> R + DynSync + DynSend,
173211
) -> C {
174212
parallel_guard(|guard| {
175213
if mode::is_dyn_thread_safe() {
176214
let map = FromDyn::from(map);
177-
t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
215+
216+
let mut items: Vec<(Option<I>, Option<R>)> =
217+
t.into_iter().map(|i| (Some(i), None)).collect();
218+
219+
par_slice(&mut items, guard, |i| {
220+
i.1 = Some(map(i.0.take().unwrap()));
221+
});
222+
223+
items.into_iter().filter_map(|i| i.1).collect()
178224
} else {
179225
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
180226
}

compiler/rustc_interface/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ edition = "2024"
55

66
[dependencies]
77
# tidy-alphabetical-start
8-
rustc-rayon = { version = "0.5.0" }
98
rustc-rayon-core = { version = "0.5.0" }
109
rustc_ast = { path = "../rustc_ast" }
1110
rustc_ast_lowering = { path = "../rustc_ast_lowering" }

compiler/rustc_interface/src/util.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
179179
let current_gcx = FromDyn::from(CurrentGcx::new());
180180
let current_gcx2 = current_gcx.clone();
181181

182-
let builder = rayon::ThreadPoolBuilder::new()
182+
let builder = rayon_core::ThreadPoolBuilder::new()
183183
.thread_name(|_| "rustc".to_string())
184184
.acquire_thread_handler(jobserver::acquire_thread)
185185
.release_thread_handler(jobserver::release_thread)
@@ -236,7 +236,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
236236
builder
237237
.build_scoped(
238238
// Initialize each new worker thread when created.
239-
move |thread: rayon::ThreadBuilder| {
239+
move |thread: rayon_core::ThreadBuilder| {
240240
// Register the thread for use with the `WorkerLocal` type.
241241
registry.register();
242242

@@ -245,7 +245,9 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
245245
})
246246
},
247247
// Run `f` on the first thread in the thread pool.
248-
move |pool: &rayon::ThreadPool| pool.install(|| f(current_gcx.into_inner())),
248+
move |pool: &rayon_core::ThreadPool| {
249+
pool.install(|| f(current_gcx.into_inner()))
250+
},
249251
)
250252
.unwrap()
251253
})

compiler/rustc_metadata/src/rmeta/encoder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2199,7 +2199,7 @@ fn prefetch_mir(tcx: TyCtxt<'_>) {
21992199
}
22002200

22012201
let reachable_set = tcx.reachable_set(());
2202-
par_for_each_in(tcx.mir_keys(()), |&def_id| {
2202+
par_for_each_in(tcx.mir_keys(()), |&&def_id| {
22032203
let (encode_const, encode_opt) = should_encode_mir(tcx, reachable_set, def_id);
22042204

22052205
if encode_const {

compiler/rustc_middle/src/hir/map.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ impl<'tcx> TyCtxt<'tcx> {
343343

344344
#[inline]
345345
pub fn par_hir_body_owners(self, f: impl Fn(LocalDefId) + DynSend + DynSync) {
346-
par_for_each_in(&self.hir_crate_items(()).body_owners[..], |&def_id| f(def_id));
346+
par_for_each_in(&self.hir_crate_items(()).body_owners[..], |&&def_id| f(def_id));
347347
}
348348

349349
pub fn hir_ty_param_owner(self, def_id: LocalDefId) -> LocalDefId {

compiler/rustc_middle/src/hir/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -83,35 +83,35 @@ impl ModuleItems {
8383
&self,
8484
f: impl Fn(ItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
8585
) -> Result<(), ErrorGuaranteed> {
86-
try_par_for_each_in(&self.free_items[..], |&id| f(id))
86+
try_par_for_each_in(&self.free_items[..], |&&id| f(id))
8787
}
8888

8989
pub fn par_trait_items(
9090
&self,
9191
f: impl Fn(TraitItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
9292
) -> Result<(), ErrorGuaranteed> {
93-
try_par_for_each_in(&self.trait_items[..], |&id| f(id))
93+
try_par_for_each_in(&self.trait_items[..], |&&id| f(id))
9494
}
9595

9696
pub fn par_impl_items(
9797
&self,
9898
f: impl Fn(ImplItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
9999
) -> Result<(), ErrorGuaranteed> {
100-
try_par_for_each_in(&self.impl_items[..], |&id| f(id))
100+
try_par_for_each_in(&self.impl_items[..], |&&id| f(id))
101101
}
102102

103103
pub fn par_foreign_items(
104104
&self,
105105
f: impl Fn(ForeignItemId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
106106
) -> Result<(), ErrorGuaranteed> {
107-
try_par_for_each_in(&self.foreign_items[..], |&id| f(id))
107+
try_par_for_each_in(&self.foreign_items[..], |&&id| f(id))
108108
}
109109

110110
pub fn par_opaques(
111111
&self,
112112
f: impl Fn(LocalDefId) -> Result<(), ErrorGuaranteed> + DynSend + DynSync,
113113
) -> Result<(), ErrorGuaranteed> {
114-
try_par_for_each_in(&self.opaques[..], |&id| f(id))
114+
try_par_for_each_in(&self.opaques[..], |&&id| f(id))
115115
}
116116
}
117117

0 commit comments

Comments
 (0)