//! This module defines parallel operations that are implemented in //! one way for the serial compiler, and another way the parallel compiler. #![allow(dead_code)] use parking_lot::Mutex; use std::any::Any; use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; #[cfg(not(parallel_compiler))] pub use disabled::*; #[cfg(parallel_compiler)] pub use enabled::*; /// A guard used to hold panics that occur during a parallel section to later by unwound. /// This is used for the parallel compiler to prevent fatal errors from non-deterministically /// hiding errors by ensuring that everything in the section has completed executing before /// continuing with unwinding. It's also used for the non-parallel code to ensure error message /// output match the parallel compiler for testing purposes. pub struct ParallelGuard { panic: Mutex>>, } impl ParallelGuard { pub fn run(&self, f: impl FnOnce() -> R) -> Option { catch_unwind(AssertUnwindSafe(f)) .map_err(|err| { *self.panic.lock() = Some(err); }) .ok() } } /// This gives access to a fresh parallel guard in the closure and will unwind any panics /// caught in it after the closure returns. #[inline] pub fn parallel_guard(f: impl FnOnce(&ParallelGuard) -> R) -> R { let guard = ParallelGuard { panic: Mutex::new(None) }; let ret = f(&guard); if let Some(panic) = guard.panic.into_inner() { resume_unwind(panic); } ret } mod disabled { use crate::sync::parallel_guard; #[macro_export] #[cfg(not(parallel_compiler))] macro_rules! parallel { ($($blocks:block),*) => {{ $crate::sync::parallel_guard(|guard| { $(guard.run(|| $blocks);)* }); }} } pub fn join(oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce() -> RA, B: FnOnce() -> RB, { let (a, b) = parallel_guard(|guard| { let a = guard.run(oper_a); let b = guard.run(oper_b); (a, b) }); (a.unwrap(), b.unwrap()) } pub fn par_for_each_in(t: T, mut for_each: impl FnMut(T::Item)) { parallel_guard(|guard| { t.into_iter().for_each(|i| { guard.run(|| for_each(i)); }); }) } pub fn par_map>( t: T, mut map: impl FnMut(<::IntoIter as Iterator>::Item) -> R, ) -> C { parallel_guard(|guard| t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()) } } #[cfg(parallel_compiler)] mod enabled { use crate::sync::{mode, parallel_guard, DynSend, DynSync, FromDyn}; /// Runs a list of blocks in parallel. The first block is executed immediately on /// the current thread. Use that for the longest running block. #[macro_export] macro_rules! parallel { (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => { parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) }; (impl $fblock:block [$($blocks:expr,)*] []) => { ::rustc_data_structures::sync::scope(|s| { $(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks); s.spawn(move |_| block.into_inner()());)* (|| $fblock)(); }); }; ($fblock:block, $($blocks:block),*) => { if rustc_data_structures::sync::is_dyn_thread_safe() { // Reverse the order of the later blocks since Rayon executes them in reverse order // when using a single thread. This ensures the execution order matches that // of a single threaded rustc. parallel!(impl $fblock [] [$($blocks),*]); } else { $crate::sync::parallel_guard(|guard| { guard.run(|| $fblock); $(guard.run(|| $blocks);)* }); } }; } // This function only works when `mode::is_dyn_thread_safe()`. pub fn scope<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend, R: DynSend, { let op = FromDyn::from(op); rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() } #[inline] pub fn join(oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce() -> RA + DynSend, B: FnOnce() -> RB + DynSend, { if mode::is_dyn_thread_safe() { let oper_a = FromDyn::from(oper_a); let oper_b = FromDyn::from(oper_b); let (a, b) = rayon::join( move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()), ); (a.into_inner(), b.into_inner()) } else { super::disabled::join(oper_a, oper_b) } } use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator}; pub fn par_for_each_in + IntoParallelIterator>( t: T, for_each: impl Fn(I) + DynSync + DynSend, ) { parallel_guard(|guard| { if mode::is_dyn_thread_safe() { let for_each = FromDyn::from(for_each); t.into_par_iter().for_each(|i| { guard.run(|| for_each(i)); }); } else { t.into_iter().for_each(|i| { guard.run(|| for_each(i)); }); } }); } pub fn par_map< I, T: IntoIterator + IntoParallelIterator, R: std::marker::Send, C: FromIterator + FromParallelIterator, >( t: T, map: impl Fn(I) -> R + DynSync + DynSend, ) -> C { parallel_guard(|guard| { if mode::is_dyn_thread_safe() { let map = FromDyn::from(map); t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect() } else { t.into_iter().filter_map(|i| guard.run(|| map(i))).collect() } }) } }