std/thread/
lifecycle.rs

1//! The inner logic for thread spawning and joining.
2
3use super::current::set_current;
4use super::id::ThreadId;
5use super::scoped::ScopeData;
6use super::thread::Thread;
7use super::{Result, spawnhook};
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::{ManuallyDrop, MaybeUninit};
11use crate::sync::Arc;
12use crate::sync::atomic::{Atomic, AtomicUsize, Ordering};
13use crate::sys::thread as imp;
14use crate::sys_common::{AsInner, IntoInner};
15use crate::{env, io, panic};
16
17#[cfg_attr(miri, track_caller)] // even without panics, this helps for Miri backtraces
18pub(super) unsafe fn spawn_unchecked<'scope, F, T>(
19    name: Option<String>,
20    stack_size: Option<usize>,
21    no_hooks: bool,
22    scope_data: Option<Arc<ScopeData>>,
23    f: F,
24) -> io::Result<JoinInner<'scope, T>>
25where
26    F: FnOnce() -> T,
27    F: Send,
28    T: Send,
29{
30    let stack_size = stack_size.unwrap_or_else(|| {
31        static MIN: Atomic<usize> = AtomicUsize::new(0);
32
33        match MIN.load(Ordering::Relaxed) {
34            0 => {}
35            n => return n - 1,
36        }
37
38        let amt = env::var_os("RUST_MIN_STACK")
39            .and_then(|s| s.to_str().and_then(|s| s.parse().ok()))
40            .unwrap_or(imp::DEFAULT_MIN_STACK_SIZE);
41
42        // 0 is our sentinel value, so ensure that we'll never see 0 after
43        // initialization has run
44        MIN.store(amt + 1, Ordering::Relaxed);
45        amt
46    });
47
48    let id = ThreadId::new();
49    let thread = Thread::new(id, name);
50
51    let hooks = if no_hooks {
52        spawnhook::ChildSpawnHooks::default()
53    } else {
54        spawnhook::run_spawn_hooks(&thread)
55    };
56
57    let my_packet: Arc<Packet<'scope, T>> =
58        Arc::new(Packet { scope: scope_data, result: UnsafeCell::new(None), _marker: PhantomData });
59    let their_packet = my_packet.clone();
60
61    // Pass `f` in `MaybeUninit` because actually that closure might *run longer than the lifetime of `F`*.
62    // See <https://github.com/rust-lang/rust/issues/101983> for more details.
63    // To prevent leaks we use a wrapper that drops its contents.
64    #[repr(transparent)]
65    struct MaybeDangling<T>(MaybeUninit<T>);
66    impl<T> MaybeDangling<T> {
67        fn new(x: T) -> Self {
68            MaybeDangling(MaybeUninit::new(x))
69        }
70        fn into_inner(self) -> T {
71            // Make sure we don't drop.
72            let this = ManuallyDrop::new(self);
73            // SAFETY: we are always initialized.
74            unsafe { this.0.assume_init_read() }
75        }
76    }
77    impl<T> Drop for MaybeDangling<T> {
78        fn drop(&mut self) {
79            // SAFETY: we are always initialized.
80            unsafe { self.0.assume_init_drop() };
81        }
82    }
83
84    let f = MaybeDangling::new(f);
85
86    // The entrypoint of the Rust thread, after platform-specific thread
87    // initialization is done.
88    let rust_start = move || {
89        let f = f.into_inner();
90        let try_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
91            crate::sys::backtrace::__rust_begin_short_backtrace(|| hooks.run());
92            crate::sys::backtrace::__rust_begin_short_backtrace(f)
93        }));
94        // SAFETY: `their_packet` as been built just above and moved by the
95        // closure (it is an Arc<...>) and `my_packet` will be stored in the
96        // same `JoinInner` as this closure meaning the mutation will be
97        // safe (not modify it and affect a value far away).
98        unsafe { *their_packet.result.get() = Some(try_result) };
99        // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that
100        // will call `decrement_num_running_threads` and therefore signal that this thread is
101        // done.
102        drop(their_packet);
103        // Here, the lifetime `'scope` can end. `main` keeps running for a bit
104        // after that before returning itself.
105    };
106
107    if let Some(scope_data) = &my_packet.scope {
108        scope_data.increment_num_running_threads();
109    }
110
111    // SAFETY: dynamic size and alignment of the Box remain the same. See below for why the
112    // lifetime change is justified.
113    let rust_start = unsafe {
114        Box::from_raw(Box::into_raw(Box::new(rust_start)) as *mut (dyn FnOnce() + Send + 'static))
115    };
116
117    let init = Box::new(ThreadInit { handle: thread.clone(), rust_start });
118
119    Ok(JoinInner {
120        // SAFETY:
121        //
122        // `imp::Thread::new` takes a closure with a `'static` lifetime, since it's passed
123        // through FFI or otherwise used with low-level threading primitives that have no
124        // notion of or way to enforce lifetimes.
125        //
126        // As mentioned in the `Safety` section of this function's documentation, the caller of
127        // this function needs to guarantee that the passed-in lifetime is sufficiently long
128        // for the lifetime of the thread.
129        //
130        // Similarly, the `sys` implementation must guarantee that no references to the closure
131        // exist after the thread has terminated, which is signaled by `Thread::join`
132        // returning.
133        native: unsafe { imp::Thread::new(stack_size, init)? },
134        thread,
135        packet: my_packet,
136    })
137}
138
139/// The data passed to the spawned thread for thread initialization. Any thread
140/// implementation should start a new thread by calling .init() on this before
141/// doing anything else to ensure the current thread is properly initialized and
142/// the global allocator works.
143pub(crate) struct ThreadInit {
144    pub handle: Thread,
145    pub rust_start: Box<dyn FnOnce() + Send>,
146}
147
148impl ThreadInit {
149    /// Initialize the 'current thread' mechanism on this thread, returning the
150    /// Rust entry point.
151    pub fn init(self: Box<Self>) -> Box<dyn FnOnce() + Send> {
152        // Set the current thread before any (de)allocations on the global allocator occur,
153        // so that it may call std::thread::current() in its implementation. This is also
154        // why we take Box<Self>, to ensure the Box is not destroyed until after this point.
155        // Cloning the handle does not invoke the global allocator, it is an Arc.
156        if let Err(_thread) = set_current(self.handle.clone()) {
157            // The current thread should not have set yet. Use an abort to save binary size (see #123356).
158            rtabort!("current thread handle already set during thread spawn");
159        }
160
161        if let Some(name) = self.handle.cname() {
162            imp::set_name(name);
163        }
164
165        self.rust_start
166    }
167}
168
169// This packet is used to communicate the return value between the spawned
170// thread and the rest of the program. It is shared through an `Arc` and
171// there's no need for a mutex here because synchronization happens with `join()`
172// (the caller will never read this packet until the thread has exited).
173//
174// An Arc to the packet is stored into a `JoinInner` which in turns is placed
175// in `JoinHandle`.
176struct Packet<'scope, T> {
177    scope: Option<Arc<ScopeData>>,
178    result: UnsafeCell<Option<Result<T>>>,
179    _marker: PhantomData<Option<&'scope ScopeData>>,
180}
181
182// Due to the usage of `UnsafeCell` we need to manually implement Sync.
183// The type `T` should already always be Send (otherwise the thread could not
184// have been created) and the Packet is Sync because all access to the
185// `UnsafeCell` synchronized (by the `join()` boundary), and `ScopeData` is Sync.
186unsafe impl<'scope, T: Send> Sync for Packet<'scope, T> {}
187
188impl<'scope, T> Drop for Packet<'scope, T> {
189    fn drop(&mut self) {
190        // If this packet was for a thread that ran in a scope, the thread
191        // panicked, and nobody consumed the panic payload, we make sure
192        // the scope function will panic.
193        let unhandled_panic = matches!(self.result.get_mut(), Some(Err(_)));
194        // Drop the result without causing unwinding.
195        // This is only relevant for threads that aren't join()ed, as
196        // join() will take the `result` and set it to None, such that
197        // there is nothing left to drop here.
198        // If this panics, we should handle that, because we're outside the
199        // outermost `catch_unwind` of our thread.
200        // We just abort in that case, since there's nothing else we can do.
201        // (And even if we tried to handle it somehow, we'd also need to handle
202        // the case where the panic payload we get out of it also panics on
203        // drop, and so on. See issue #86027.)
204        if let Err(_) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
205            *self.result.get_mut() = None;
206        })) {
207            rtabort!("thread result panicked on drop");
208        }
209        // Book-keeping so the scope knows when it's done.
210        if let Some(scope) = &self.scope {
211            // Now that there will be no more user code running on this thread
212            // that can use 'scope, mark the thread as 'finished'.
213            // It's important we only do this after the `result` has been dropped,
214            // since dropping it might still use things it borrowed from 'scope.
215            scope.decrement_num_running_threads(unhandled_panic);
216        }
217    }
218}
219
220/// Inner representation for JoinHandle
221pub(super) struct JoinInner<'scope, T> {
222    native: imp::Thread,
223    thread: Thread,
224    packet: Arc<Packet<'scope, T>>,
225}
226
227impl<'scope, T> JoinInner<'scope, T> {
228    pub(super) fn is_finished(&self) -> bool {
229        Arc::strong_count(&self.packet) == 1
230    }
231
232    pub(super) fn thread(&self) -> &Thread {
233        &self.thread
234    }
235
236    pub(super) fn join(mut self) -> Result<T> {
237        self.native.join();
238        Arc::get_mut(&mut self.packet)
239            // FIXME(fuzzypixelz): returning an error instead of panicking here
240            // would require updating the documentation of
241            // `std::thread::Result`; currently we can return `Err` if and only
242            // if the thread had panicked.
243            .expect("threads should not terminate unexpectedly")
244            .result
245            .get_mut()
246            .take()
247            .unwrap()
248    }
249}
250
251impl<T> AsInner<imp::Thread> for JoinInner<'static, T> {
252    fn as_inner(&self) -> &imp::Thread {
253        &self.native
254    }
255}
256
257impl<T> IntoInner<imp::Thread> for JoinInner<'static, T> {
258    fn into_inner(self) -> imp::Thread {
259        self.native
260    }
261}