rustc_data_structures/sync/
parallel.rs1#![allow(dead_code)]
5
6use std::any::Any;
7use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
8
9use parking_lot::Mutex;
10
11use crate::FatalErrorMarker;
12use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
13
14pub struct ParallelGuard {
20 panic: Mutex<Option<IntoDynSyncSend<Box<dyn Any + Send + 'static>>>>,
21}
22
23impl ParallelGuard {
24 pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
25 catch_unwind(AssertUnwindSafe(f))
26 .map_err(|err| {
27 let mut panic = self.panic.lock();
28 if panic.is_none() || !(*err).is::<FatalErrorMarker>() {
29 *panic = Some(IntoDynSyncSend(err));
30 }
31 })
32 .ok()
33 }
34}
35
36#[inline]
39pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
40 let guard = ParallelGuard { panic: Mutex::new(None) };
41 let ret = f(&guard);
42 if let Some(IntoDynSyncSend(panic)) = guard.panic.into_inner() {
43 resume_unwind(panic);
44 }
45 ret
46}
47
48fn serial_join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
49where
50 A: FnOnce() -> RA,
51 B: FnOnce() -> RB,
52{
53 let (a, b) = parallel_guard(|guard| {
54 let a = guard.run(oper_a);
55 let b = guard.run(oper_b);
56 (a, b)
57 });
58 (a.unwrap(), b.unwrap())
59}
60
61#[macro_export]
64macro_rules! parallel {
65 (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
66 parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
67 };
68 (impl $fblock:block [$($blocks:expr,)*] []) => {
69 $crate::sync::parallel_guard(|guard| {
70 $crate::sync::scope(|s| {
71 $(
72 let block = $crate::sync::FromDyn::from(|| $blocks);
73 s.spawn(move |_| {
74 guard.run(move || block.into_inner()());
75 });
76 )*
77 guard.run(|| $fblock);
78 });
79 });
80 };
81 ($fblock:block, $($blocks:block),*) => {
82 if $crate::sync::is_dyn_thread_safe() {
83 parallel!(impl $fblock [] [$($blocks),*]);
87 } else {
88 $crate::sync::parallel_guard(|guard| {
89 guard.run(|| $fblock);
90 $(guard.run(|| $blocks);)*
91 });
92 }
93 };
94 }
95
96pub fn scope<'scope, OP, R>(op: OP) -> R
98where
99 OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
100 R: DynSend,
101{
102 let op = FromDyn::from(op);
103 rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
104}
105
106#[inline]
107pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
108where
109 A: FnOnce() -> RA + DynSend,
110 B: FnOnce() -> RB + DynSend,
111{
112 if mode::is_dyn_thread_safe() {
113 let oper_a = FromDyn::from(oper_a);
114 let oper_b = FromDyn::from(oper_b);
115 let (a, b) = parallel_guard(|guard| {
116 rayon_core::join(
117 move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
118 move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
119 )
120 });
121 (a.unwrap().into_inner(), b.unwrap().into_inner())
122 } else {
123 serial_join(oper_a, oper_b)
124 }
125}
126
127fn par_slice<I: DynSend>(
128 items: &mut [I],
129 guard: &ParallelGuard,
130 for_each: impl Fn(&mut I) + DynSync + DynSend,
131) {
132 struct State<'a, F> {
133 for_each: FromDyn<F>,
134 guard: &'a ParallelGuard,
135 group: usize,
136 }
137
138 fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
139 items: &mut [I],
140 state: &State<'_, F>,
141 ) {
142 if items.len() <= state.group {
143 for item in items {
144 state.guard.run(|| (state.for_each)(item));
145 }
146 } else {
147 let (left, right) = items.split_at_mut(items.len() / 2);
148 let mut left = state.for_each.derive(left);
149 let mut right = state.for_each.derive(right);
150 rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
151 }
152 }
153
154 let state = State {
155 for_each: FromDyn::from(for_each),
156 guard,
157 group: std::cmp::max(items.len() / 128, 1),
158 };
159 par_rec(items, &state)
160}
161
162pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
163 t: T,
164 for_each: impl Fn(&I) + DynSync + DynSend,
165) {
166 parallel_guard(|guard| {
167 if mode::is_dyn_thread_safe() {
168 let mut items: Vec<_> = t.into_iter().collect();
169 par_slice(&mut items, guard, |i| for_each(&*i))
170 } else {
171 t.into_iter().for_each(|i| {
172 guard.run(|| for_each(&i));
173 });
174 }
175 });
176}
177
178pub fn try_par_for_each_in<T: IntoIterator, E: DynSend>(
183 t: T,
184 for_each: impl Fn(&<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
185) -> Result<(), E>
186where
187 <T as IntoIterator>::Item: DynSend,
188{
189 parallel_guard(|guard| {
190 if mode::is_dyn_thread_safe() {
191 let mut items: Vec<_> = t.into_iter().collect();
192
193 let error = Mutex::new(None);
194
195 par_slice(&mut items, guard, |i| {
196 if let Err(err) = for_each(&*i) {
197 *error.lock() = Some(err);
198 }
199 });
200
201 if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) }
202 } else {
203 t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and)
204 }
205 })
206}
207
208pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterator<R>>(
209 t: T,
210 map: impl Fn(I) -> R + DynSync + DynSend,
211) -> C {
212 parallel_guard(|guard| {
213 if mode::is_dyn_thread_safe() {
214 let map = FromDyn::from(map);
215
216 let mut items: Vec<(Option<I>, Option<R>)> =
217 t.into_iter().map(|i| (Some(i), None)).collect();
218
219 par_slice(&mut items, guard, |i| {
220 i.1 = Some(map(i.0.take().unwrap()));
221 });
222
223 items.into_iter().filter_map(|i| i.1).collect()
224 } else {
225 t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
226 }
227 })
228}