Skip to main content

rustc_data_structures/sync/
parallel.rs

1//! This module defines parallel operations that are implemented in
2//! one way for the serial compiler, and another way the parallel compiler.
3
4use std::any::Any;
5use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
6
7use parking_lot::Mutex;
8
9use crate::FatalErrorMarker;
10use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
11
12/// A guard used to hold panics that occur during a parallel section to later by unwound.
13/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
14/// hiding errors by ensuring that everything in the section has completed executing before
15/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
16/// output match the parallel compiler for testing purposes.
17pub struct ParallelGuard {
18    panic: Mutex<Option<IntoDynSyncSend<Box<dyn Any + Send + 'static>>>>,
19}
20
21impl ParallelGuard {
22    pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
23        catch_unwind(AssertUnwindSafe(f))
24            .map_err(|err| {
25                let mut panic = self.panic.lock();
26                if panic.is_none() || !(*err).is::<FatalErrorMarker>() {
27                    *panic = Some(IntoDynSyncSend(err));
28                }
29            })
30            .ok()
31    }
32}
33
34/// This gives access to a fresh parallel guard in the closure and will unwind any panics
35/// caught in it after the closure returns.
36#[inline]
37pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
38    let guard = ParallelGuard { panic: Mutex::new(None) };
39    let ret = f(&guard);
40    if let Some(IntoDynSyncSend(panic)) = guard.panic.into_inner() {
41        resume_unwind(panic);
42    }
43    ret
44}
45
46fn serial_join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
47where
48    A: FnOnce() -> RA,
49    B: FnOnce() -> RB,
50{
51    let (a, b) = parallel_guard(|guard| {
52        let a = guard.run(oper_a);
53        let b = guard.run(oper_b);
54        (a, b)
55    });
56    (a.unwrap(), b.unwrap())
57}
58
59pub fn spawn(func: impl FnOnce() + DynSend + 'static) {
60    if let Some(proof) = mode::check_dyn_thread_safe() {
61        let func = proof.derive(func);
62        rustc_thread_pool::spawn(|| {
63            (func.into_inner())();
64        });
65    } else {
66        func()
67    }
68}
69
70/// Runs the functions in parallel.
71///
72/// The first function is executed immediately on the current thread.
73/// Use that for the longest running function for better scheduling.
74pub fn par_fns(funcs: &mut [&mut (dyn FnMut() + DynSend)]) {
75    parallel_guard(|guard: &ParallelGuard| {
76        if let Some(proof) = mode::check_dyn_thread_safe() {
77            let funcs = proof.derive(funcs);
78            rustc_thread_pool::scope(|s| {
79                let Some((first, rest)) = funcs.into_inner().split_at_mut_checked(1) else {
80                    return;
81                };
82
83                // Reverse the order of the later functions since Rayon executes them in reverse
84                // order when using a single thread. This ensures the execution order matches
85                // that of a single threaded rustc.
86                for f in rest.iter_mut().rev() {
87                    let f = proof.derive(f);
88                    s.spawn(|_| {
89                        guard.run(|| (f.into_inner())());
90                    });
91                }
92
93                // Run the first function without spawning to
94                // ensure it executes immediately on this thread.
95                guard.run(|| first[0]());
96            });
97        } else {
98            for f in funcs {
99                guard.run(|| f());
100            }
101        }
102    });
103}
104
105#[inline]
106pub fn par_join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
107where
108    A: FnOnce() -> RA + DynSend,
109    B: FnOnce() -> RB + DynSend,
110{
111    if let Some(proof) = mode::check_dyn_thread_safe() {
112        let oper_a = proof.derive(oper_a);
113        let oper_b = proof.derive(oper_b);
114        let (a, b) = parallel_guard(|guard| {
115            rustc_thread_pool::join(
116                move || guard.run(move || proof.derive(oper_a.into_inner()())),
117                move || guard.run(move || proof.derive(oper_b.into_inner()())),
118            )
119        });
120        (a.unwrap().into_inner(), b.unwrap().into_inner())
121    } else {
122        serial_join(oper_a, oper_b)
123    }
124}
125
126fn par_slice<I: DynSend>(
127    items: &mut [I],
128    guard: &ParallelGuard,
129    for_each: impl Fn(&mut I) + DynSync + DynSend,
130    proof: FromDyn<()>,
131) {
132    match items {
133        [] => return,
134        [item] => {
135            guard.run(|| for_each(item));
136            return;
137        }
138        _ => (),
139    }
140
141    let for_each = proof.derive(for_each);
142    let mut items = for_each.derive(items);
143    rustc_thread_pool::scope(|s| {
144        let proof = items.derive(());
145
146        const MAX_GROUP_COUNT: usize = 128;
147        let group_size = items.len().div_ceil(MAX_GROUP_COUNT);
148        let groups = items.chunks_mut(group_size);
149
150        // Reverse the order of the later functions since Rayon executes them in reverse
151        // order when using a single thread. This ensures the execution order matches
152        // that of a single threaded rustc.
153        for group in groups.rev() {
154            let group = proof.derive(group);
155            s.spawn(|_| {
156                let mut group = group;
157                for i in group.iter_mut() {
158                    guard.run(|| for_each(i));
159                }
160            });
161        }
162    });
163}
164
165pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
166    t: T,
167    for_each: impl Fn(&I) + DynSync + DynSend,
168) {
169    parallel_guard(|guard| {
170        if let Some(proof) = mode::check_dyn_thread_safe() {
171            let mut items: Vec<_> = t.into_iter().collect();
172            par_slice(&mut items, guard, |i| for_each(&*i), proof)
173        } else {
174            t.into_iter().for_each(|i| {
175                guard.run(|| for_each(&i));
176            });
177        }
178    });
179}
180
181/// This runs `for_each` in parallel for each iterator item. If one or more of the
182/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned
183/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which
184/// are all equivalent.
185pub fn try_par_for_each_in<T: IntoIterator, E: DynSend>(
186    t: T,
187    for_each: impl Fn(&<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
188) -> Result<(), E>
189where
190    <T as IntoIterator>::Item: DynSend,
191{
192    parallel_guard(|guard| {
193        if let Some(proof) = mode::check_dyn_thread_safe() {
194            let mut items: Vec<_> = t.into_iter().collect();
195
196            let error = Mutex::new(None);
197
198            par_slice(
199                &mut items,
200                guard,
201                |i| {
202                    if let Err(err) = for_each(&*i) {
203                        *error.lock() = Some(err);
204                    }
205                },
206                proof,
207            );
208
209            if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) }
210        } else {
211            t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and)
212        }
213    })
214}
215
216pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterator<R>>(
217    t: T,
218    map: impl Fn(I) -> R + DynSync + DynSend,
219) -> C {
220    parallel_guard(|guard| {
221        if let Some(proof) = mode::check_dyn_thread_safe() {
222            let map = proof.derive(map);
223
224            let mut items: Vec<(Option<I>, Option<R>)> =
225                t.into_iter().map(|i| (Some(i), None)).collect();
226
227            par_slice(
228                &mut items,
229                guard,
230                |i| {
231                    i.1 = Some(map(i.0.take().unwrap()));
232                },
233                proof,
234            );
235
236            items.into_iter().filter_map(|i| i.1).collect()
237        } else {
238            t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
239        }
240    })
241}
242
243pub fn broadcast<R: DynSend>(op: impl Fn(usize) -> R + DynSync) -> Vec<R> {
244    if let Some(proof) = mode::check_dyn_thread_safe() {
245        let op = proof.derive(op);
246        let results = rustc_thread_pool::broadcast(|context| op.derive(op(context.index())));
247        results.into_iter().map(|r| r.into_inner()).collect()
248    } else {
249        ::alloc::boxed::box_assume_init_into_vec_unsafe(::alloc::intrinsics::write_box_via_move(::alloc::boxed::Box::new_uninit(),
        [op(0)]))vec![op(0)]
250    }
251}