std/thread/scoped.rs
1use super::Result;
2use super::builder::Builder;
3use super::current::current_or_unnamed;
4use super::lifecycle::{JoinInner, spawn_unchecked};
5use super::thread::Thread;
6use crate::marker::PhantomData;
7use crate::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
8use crate::sync::Arc;
9use crate::sync::atomic::{Atomic, AtomicBool, AtomicUsize, Ordering};
10use crate::{fmt, io};
11
12/// A scope to spawn scoped threads in.
13///
14/// See [`scope`] for details.
15#[stable(feature = "scoped_threads", since = "1.63.0")]
16pub struct Scope<'scope, 'env: 'scope> {
17 data: Arc<ScopeData>,
18 /// Invariance over 'scope, to make sure 'scope cannot shrink,
19 /// which is necessary for soundness.
20 ///
21 /// Without invariance, this would compile fine but be unsound:
22 ///
23 /// ```compile_fail,E0373
24 /// std::thread::scope(|s| {
25 /// s.spawn(|| {
26 /// let a = String::from("abcd");
27 /// s.spawn(|| println!("{a:?}")); // might run after `a` is dropped
28 /// });
29 /// });
30 /// ```
31 scope: PhantomData<&'scope mut &'scope ()>,
32 env: PhantomData<&'env mut &'env ()>,
33}
34
35/// An owned permission to join on a scoped thread (block on its termination).
36///
37/// See [`Scope::spawn`] for details.
38#[stable(feature = "scoped_threads", since = "1.63.0")]
39pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
40
41pub(super) struct ScopeData {
42 num_running_threads: Atomic<usize>,
43 a_thread_panicked: Atomic<bool>,
44 main_thread: Thread,
45}
46
47impl ScopeData {
48 pub(super) fn increment_num_running_threads(&self) {
49 // We check for 'overflow' with usize::MAX / 2, to make sure there's no
50 // chance it overflows to 0, which would result in unsoundness.
51 if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
52 // This can only reasonably happen by mem::forget()'ing a lot of ScopedJoinHandles.
53 self.overflow();
54 }
55 }
56
57 #[cold]
58 fn overflow(&self) {
59 self.decrement_num_running_threads(false);
60 panic!("too many running threads in thread scope");
61 }
62
63 pub(super) fn decrement_num_running_threads(&self, panic: bool) {
64 if panic {
65 self.a_thread_panicked.store(true, Ordering::Relaxed);
66 }
67 if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
68 self.main_thread.unpark();
69 }
70 }
71}
72
73/// Creates a scope for spawning scoped threads.
74///
75/// The function passed to `scope` will be provided a [`Scope`] object,
76/// through which scoped threads can be [spawned][`Scope::spawn`].
77///
78/// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
79/// as the scope guarantees all threads will be joined at the end of the scope.
80///
81/// All threads spawned within the scope that haven't been manually joined
82/// will be automatically joined before this function returns.
83///
84/// # Panics
85///
86/// If any of the automatically joined threads panicked, this function will panic.
87///
88/// If you want to handle panics from spawned threads,
89/// [`join`][ScopedJoinHandle::join] them before the end of the scope.
90///
91/// # Example
92///
93/// ```
94/// use std::thread;
95///
96/// let mut a = vec![1, 2, 3];
97/// let mut x = 0;
98///
99/// thread::scope(|s| {
100/// s.spawn(|| {
101/// println!("hello from the first scoped thread");
102/// // We can borrow `a` here.
103/// dbg!(&a);
104/// });
105/// s.spawn(|| {
106/// println!("hello from the second scoped thread");
107/// // We can even mutably borrow `x` here,
108/// // because no other threads are using it.
109/// x += a[0] + a[2];
110/// });
111/// println!("hello from the main thread");
112/// });
113///
114/// // After the scope, we can modify and access our variables again:
115/// a.push(4);
116/// assert_eq!(x, a.len());
117/// ```
118///
119/// # Lifetimes
120///
121/// Scoped threads involve two lifetimes: `'scope` and `'env`.
122///
123/// The `'scope` lifetime represents the lifetime of the scope itself.
124/// That is: the time during which new scoped threads may be spawned,
125/// and also the time during which they might still be running.
126/// Once this lifetime ends, all scoped threads are joined.
127/// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts.
128/// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns.
129///
130/// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads.
131/// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`.
132/// It can be as small as the call to `scope`, meaning that anything that outlives this call,
133/// such as local variables defined right before the scope, can be borrowed by the scoped threads.
134///
135/// The `'env: 'scope` bound is part of the definition of the `Scope` type.
136#[track_caller]
137#[stable(feature = "scoped_threads", since = "1.63.0")]
138pub fn scope<'env, F, T>(f: F) -> T
139where
140 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
141{
142 // We put the `ScopeData` into an `Arc` so that other threads can finish their
143 // `decrement_num_running_threads` even after this function returns.
144 let scope = Scope {
145 data: Arc::new(ScopeData {
146 num_running_threads: AtomicUsize::new(0),
147 main_thread: current_or_unnamed(),
148 a_thread_panicked: AtomicBool::new(false),
149 }),
150 env: PhantomData,
151 scope: PhantomData,
152 };
153
154 // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
155 let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
156
157 // Wait until all the threads are finished.
158 while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
159 // SAFETY: this is the main thread, the handle belongs to us.
160 unsafe { scope.data.main_thread.park() };
161 }
162
163 // Throw any panic from `f`, or the return value of `f` if no thread panicked.
164 match result {
165 Err(e) => resume_unwind(e),
166 Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
167 panic!("a scoped thread panicked")
168 }
169 Ok(result) => result,
170 }
171}
172
173impl<'scope, 'env> Scope<'scope, 'env> {
174 /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
175 ///
176 /// Unlike non-scoped threads, threads spawned with this function may
177 /// borrow non-`'static` data from the outside the scope. See [`scope`] for
178 /// details.
179 ///
180 /// The join handle provides a [`join`] method that can be used to join the spawned
181 /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
182 /// the panic payload.
183 ///
184 /// If the join handle is dropped, the spawned thread will be implicitly joined at the
185 /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
186 /// panic after all threads are joined.
187 ///
188 /// This function creates a thread with the default parameters of [`Builder`].
189 /// To specify the new thread's stack size or the name, use [`Builder::spawn_scoped`].
190 ///
191 /// # Panics
192 ///
193 /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
194 /// to recover from such errors.
195 ///
196 /// [`join`]: ScopedJoinHandle::join
197 #[stable(feature = "scoped_threads", since = "1.63.0")]
198 pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
199 where
200 F: FnOnce() -> T + Send + 'scope,
201 T: Send + 'scope,
202 {
203 Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
204 }
205}
206
207impl Builder {
208 /// Spawns a new scoped thread using the settings set through this `Builder`.
209 ///
210 /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
211 /// capture any failure to create the thread at the OS level.
212 ///
213 /// [`io::Result`]: crate::io::Result
214 ///
215 /// # Panics
216 ///
217 /// Panics if a thread name was set and it contained null bytes.
218 ///
219 /// # Example
220 ///
221 /// ```
222 /// use std::thread;
223 ///
224 /// let mut a = vec![1, 2, 3];
225 /// let mut x = 0;
226 ///
227 /// thread::scope(|s| {
228 /// thread::Builder::new()
229 /// .name("first".to_string())
230 /// .spawn_scoped(s, ||
231 /// {
232 /// println!("hello from the {:?} scoped thread", thread::current().name());
233 /// // We can borrow `a` here.
234 /// dbg!(&a);
235 /// })
236 /// .unwrap();
237 /// thread::Builder::new()
238 /// .name("second".to_string())
239 /// .spawn_scoped(s, ||
240 /// {
241 /// println!("hello from the {:?} scoped thread", thread::current().name());
242 /// // We can even mutably borrow `x` here,
243 /// // because no other threads are using it.
244 /// x += a[0] + a[2];
245 /// })
246 /// .unwrap();
247 /// println!("hello from the main thread");
248 /// });
249 ///
250 /// // After the scope, we can modify and access our variables again:
251 /// a.push(4);
252 /// assert_eq!(x, a.len());
253 /// ```
254 #[stable(feature = "scoped_threads", since = "1.63.0")]
255 pub fn spawn_scoped<'scope, 'env, F, T>(
256 self,
257 scope: &'scope Scope<'scope, 'env>,
258 f: F,
259 ) -> io::Result<ScopedJoinHandle<'scope, T>>
260 where
261 F: FnOnce() -> T + Send + 'scope,
262 T: Send + 'scope,
263 {
264 let Builder { name, stack_size, no_hooks } = self;
265 Ok(ScopedJoinHandle(unsafe {
266 spawn_unchecked(name, stack_size, no_hooks, Some(scope.data.clone()), f)
267 }?))
268 }
269}
270
271impl<'scope, T> ScopedJoinHandle<'scope, T> {
272 /// Extracts a handle to the underlying thread.
273 ///
274 /// # Examples
275 ///
276 /// ```
277 /// use std::thread;
278 ///
279 /// thread::scope(|s| {
280 /// let t = s.spawn(|| {
281 /// println!("hello");
282 /// });
283 /// println!("thread id: {:?}", t.thread().id());
284 /// });
285 /// ```
286 #[must_use]
287 #[stable(feature = "scoped_threads", since = "1.63.0")]
288 pub fn thread(&self) -> &Thread {
289 self.0.thread()
290 }
291
292 /// Waits for the associated thread to finish.
293 ///
294 /// This function will return immediately if the associated thread has already finished.
295 ///
296 /// In terms of [atomic memory orderings], the completion of the associated
297 /// thread synchronizes with this function returning.
298 /// In other words, all operations performed by that thread
299 /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
300 /// all operations that happen after `join` returns.
301 ///
302 /// If the associated thread panics, [`Err`] is returned with the panic payload.
303 ///
304 /// [atomic memory orderings]: crate::sync::atomic
305 ///
306 /// # Examples
307 ///
308 /// ```
309 /// use std::thread;
310 ///
311 /// thread::scope(|s| {
312 /// let t = s.spawn(|| {
313 /// panic!("oh no");
314 /// });
315 /// assert!(t.join().is_err());
316 /// });
317 /// ```
318 #[stable(feature = "scoped_threads", since = "1.63.0")]
319 pub fn join(self) -> Result<T> {
320 self.0.join()
321 }
322
323 /// Checks if the associated thread has finished running its main function.
324 ///
325 /// `is_finished` supports implementing a non-blocking join operation, by checking
326 /// `is_finished`, and calling `join` if it returns `true`. This function does not block. To
327 /// block while waiting on the thread to finish, use [`join`][Self::join].
328 ///
329 /// This might return `true` for a brief moment after the thread's main
330 /// function has returned, but before the thread itself has stopped running.
331 /// However, once this returns `true`, [`join`][Self::join] can be expected
332 /// to return quickly, without blocking for any significant amount of time.
333 #[stable(feature = "scoped_threads", since = "1.63.0")]
334 pub fn is_finished(&self) -> bool {
335 self.0.is_finished()
336 }
337}
338
339#[stable(feature = "scoped_threads", since = "1.63.0")]
340impl fmt::Debug for Scope<'_, '_> {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 f.debug_struct("Scope")
343 .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
344 .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
345 .field("main_thread", &self.data.main_thread)
346 .finish_non_exhaustive()
347 }
348}
349
350#[stable(feature = "scoped_threads", since = "1.63.0")]
351impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
354 }
355}