rustc_data_structures/sync/
parallel.rs

1//! This module defines parallel operations that are implemented in
2//! one way for the serial compiler, and another way the parallel compiler.
3
4#![allow(dead_code)]
5
6use std::any::Any;
7use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
8
9use parking_lot::Mutex;
10use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
11
12use crate::FatalErrorMarker;
13use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
14
15/// A guard used to hold panics that occur during a parallel section to later by unwound.
16/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
17/// hiding errors by ensuring that everything in the section has completed executing before
18/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
19/// output match the parallel compiler for testing purposes.
20pub struct ParallelGuard {
21    panic: Mutex<Option<IntoDynSyncSend<Box<dyn Any + Send + 'static>>>>,
22}
23
24impl ParallelGuard {
25    pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
26        catch_unwind(AssertUnwindSafe(f))
27            .map_err(|err| {
28                let mut panic = self.panic.lock();
29                if panic.is_none() || !(*err).is::<FatalErrorMarker>() {
30                    *panic = Some(IntoDynSyncSend(err));
31                }
32            })
33            .ok()
34    }
35}
36
37/// This gives access to a fresh parallel guard in the closure and will unwind any panics
38/// caught in it after the closure returns.
39#[inline]
40pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
41    let guard = ParallelGuard { panic: Mutex::new(None) };
42    let ret = f(&guard);
43    if let Some(IntoDynSyncSend(panic)) = guard.panic.into_inner() {
44        resume_unwind(panic);
45    }
46    ret
47}
48
49fn serial_join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
50where
51    A: FnOnce() -> RA,
52    B: FnOnce() -> RB,
53{
54    let (a, b) = parallel_guard(|guard| {
55        let a = guard.run(oper_a);
56        let b = guard.run(oper_b);
57        (a, b)
58    });
59    (a.unwrap(), b.unwrap())
60}
61
62/// Runs a list of blocks in parallel. The first block is executed immediately on
63/// the current thread. Use that for the longest running block.
64#[macro_export]
65macro_rules! parallel {
66        (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
67            parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
68        };
69        (impl $fblock:block [$($blocks:expr,)*] []) => {
70            $crate::sync::parallel_guard(|guard| {
71                $crate::sync::scope(|s| {
72                    $(
73                        let block = $crate::sync::FromDyn::from(|| $blocks);
74                        s.spawn(move |_| {
75                            guard.run(move || block.into_inner()());
76                        });
77                    )*
78                    guard.run(|| $fblock);
79                });
80            });
81        };
82        ($fblock:block, $($blocks:block),*) => {
83            if $crate::sync::is_dyn_thread_safe() {
84                // Reverse the order of the later blocks since Rayon executes them in reverse order
85                // when using a single thread. This ensures the execution order matches that
86                // of a single threaded rustc.
87                parallel!(impl $fblock [] [$($blocks),*]);
88            } else {
89                $crate::sync::parallel_guard(|guard| {
90                    guard.run(|| $fblock);
91                    $(guard.run(|| $blocks);)*
92                });
93            }
94        };
95    }
96
97// This function only works when `mode::is_dyn_thread_safe()`.
98pub fn scope<'scope, OP, R>(op: OP) -> R
99where
100    OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
101    R: DynSend,
102{
103    let op = FromDyn::from(op);
104    rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
105}
106
107#[inline]
108pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
109where
110    A: FnOnce() -> RA + DynSend,
111    B: FnOnce() -> RB + DynSend,
112{
113    if mode::is_dyn_thread_safe() {
114        let oper_a = FromDyn::from(oper_a);
115        let oper_b = FromDyn::from(oper_b);
116        let (a, b) = parallel_guard(|guard| {
117            rayon::join(
118                move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
119                move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
120            )
121        });
122        (a.unwrap().into_inner(), b.unwrap().into_inner())
123    } else {
124        serial_join(oper_a, oper_b)
125    }
126}
127
128pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
129    t: T,
130    for_each: impl Fn(I) + DynSync + DynSend,
131) {
132    parallel_guard(|guard| {
133        if mode::is_dyn_thread_safe() {
134            let for_each = FromDyn::from(for_each);
135            t.into_par_iter().for_each(|i| {
136                guard.run(|| for_each(i));
137            });
138        } else {
139            t.into_iter().for_each(|i| {
140                guard.run(|| for_each(i));
141            });
142        }
143    });
144}
145
146pub fn try_par_for_each_in<
147    T: IntoIterator + IntoParallelIterator<Item = <T as IntoIterator>::Item>,
148    E: Send,
149>(
150    t: T,
151    for_each: impl Fn(<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
152) -> Result<(), E> {
153    parallel_guard(|guard| {
154        if mode::is_dyn_thread_safe() {
155            let for_each = FromDyn::from(for_each);
156            t.into_par_iter()
157                .filter_map(|i| guard.run(|| for_each(i)))
158                .reduce(|| Ok(()), Result::and)
159        } else {
160            t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and)
161        }
162    })
163}
164
165pub fn par_map<
166    I,
167    T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
168    R: std::marker::Send,
169    C: FromIterator<R> + FromParallelIterator<R>,
170>(
171    t: T,
172    map: impl Fn(I) -> R + DynSync + DynSend,
173) -> C {
174    parallel_guard(|guard| {
175        if mode::is_dyn_thread_safe() {
176            let map = FromDyn::from(map);
177            t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
178        } else {
179            t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
180        }
181    })
182}