miri/concurrency/
thread.rs

1//! Implements threads.
2
3use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18
19use crate::concurrency::GlobalDataRaceHandler;
20use crate::shims::tls;
21use crate::*;
22
23#[derive(Clone, Copy, Debug, PartialEq)]
24enum SchedulingAction {
25    /// Execute step on the active thread.
26    ExecuteStep,
27    /// Execute a timeout callback.
28    ExecuteTimeoutCallback,
29    /// Wait for a bit, until there is a timeout to be called.
30    Sleep(Duration),
31}
32
33/// What to do with TLS allocations from terminated threads
34#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum TlsAllocAction {
36    /// Deallocate backing memory of thread-local statics as usual
37    Deallocate,
38    /// Skip deallocating backing memory of thread-local statics and consider all memory reachable
39    /// from them as "allowed to leak" (like global `static`s).
40    Leak,
41}
42
43/// The argument type for the "unblock" callback, indicating why the thread got unblocked.
44#[derive(Clone, Copy, Debug, PartialEq)]
45pub enum UnblockKind {
46    /// Operation completed successfully, thread continues normal execution.
47    Ready,
48    /// The operation did not complete within its specified duration.
49    TimedOut,
50}
51
52/// Type alias for unblock callbacks, i.e. machine callbacks invoked when
53/// a thread gets unblocked.
54pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
55
56/// A thread identifier.
57#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
58pub struct ThreadId(u32);
59
60impl ThreadId {
61    pub fn to_u32(self) -> u32 {
62        self.0
63    }
64
65    /// Create a new thread id from a `u32` without checking if this thread exists.
66    pub fn new_unchecked(id: u32) -> Self {
67        Self(id)
68    }
69
70    pub const MAIN_THREAD: ThreadId = ThreadId(0);
71}
72
73impl Idx for ThreadId {
74    fn new(idx: usize) -> Self {
75        ThreadId(u32::try_from(idx).unwrap())
76    }
77
78    fn index(self) -> usize {
79        usize::try_from(self.0).unwrap()
80    }
81}
82
83impl From<ThreadId> for u64 {
84    fn from(t: ThreadId) -> Self {
85        t.0.into()
86    }
87}
88
89/// Keeps track of what the thread is blocked on.
90#[derive(Debug, Copy, Clone, PartialEq, Eq)]
91pub enum BlockReason {
92    /// The thread tried to join the specified thread and is blocked until that
93    /// thread terminates.
94    Join(ThreadId),
95    /// Waiting for time to pass.
96    Sleep,
97    /// Blocked on a mutex.
98    Mutex,
99    /// Blocked on a condition variable.
100    Condvar,
101    /// Blocked on a reader-writer lock.
102    RwLock,
103    /// Blocked on a Futex variable.
104    Futex,
105    /// Blocked on an InitOnce.
106    InitOnce,
107    /// Blocked on epoll.
108    Epoll,
109    /// Blocked on eventfd.
110    Eventfd,
111    /// Blocked on unnamed_socket.
112    UnnamedSocket,
113}
114
115/// The state of a thread.
116enum ThreadState<'tcx> {
117    /// The thread is enabled and can be executed.
118    Enabled,
119    /// The thread is blocked on something.
120    Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
121    /// The thread has terminated its execution. We do not delete terminated
122    /// threads (FIXME: why?).
123    Terminated,
124}
125
126impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            Self::Enabled => write!(f, "Enabled"),
130            Self::Blocked { reason, timeout, .. } =>
131                f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
132            Self::Terminated => write!(f, "Terminated"),
133        }
134    }
135}
136
137impl<'tcx> ThreadState<'tcx> {
138    fn is_enabled(&self) -> bool {
139        matches!(self, ThreadState::Enabled)
140    }
141
142    fn is_terminated(&self) -> bool {
143        matches!(self, ThreadState::Terminated)
144    }
145
146    fn is_blocked_on(&self, reason: BlockReason) -> bool {
147        matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
148    }
149}
150
151/// The join status of a thread.
152#[derive(Debug, Copy, Clone, PartialEq, Eq)]
153enum ThreadJoinStatus {
154    /// The thread can be joined.
155    Joinable,
156    /// A thread is detached if its join handle was destroyed and no other
157    /// thread can join it.
158    Detached,
159    /// The thread was already joined by some thread and cannot be joined again.
160    Joined,
161}
162
163/// A thread.
164pub struct Thread<'tcx> {
165    state: ThreadState<'tcx>,
166
167    /// Name of the thread.
168    thread_name: Option<Vec<u8>>,
169
170    /// The virtual call stack.
171    stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
172
173    /// The function to call when the stack ran empty, to figure out what to do next.
174    /// Conceptually, this is the interpreter implementation of the things that happen 'after' the
175    /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime).
176    /// (`None` is an error, it means the callback has not been set up yet or is actively running.)
177    pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
178
179    /// The index of the topmost user-relevant frame in `stack`. This field must contain
180    /// the value produced by `get_top_user_relevant_frame`.
181    /// The `None` state here represents
182    /// This field is a cache to reduce how often we call that method. The cache is manually
183    /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`.
184    top_user_relevant_frame: Option<usize>,
185
186    /// The join status.
187    join_status: ThreadJoinStatus,
188
189    /// Stack of active unwind payloads for the current thread. Used for storing
190    /// the argument of the call to `miri_start_unwind` (the payload) when unwinding.
191    /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`.
192    ///
193    /// In real unwinding, the payload gets passed as an argument to the landing pad,
194    /// which then forwards it to 'Resume'. However this argument is implicit in MIR,
195    /// so we have to store it out-of-band. When there are multiple active unwinds,
196    /// the innermost one is always caught first, so we can store them as a stack.
197    pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
198
199    /// Last OS error location in memory. It is a 32-bit integer.
200    pub(crate) last_error: Option<MPlaceTy<'tcx>>,
201}
202
203pub type StackEmptyCallback<'tcx> =
204    Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
205
206impl<'tcx> Thread<'tcx> {
207    /// Get the name of the current thread if it was set.
208    fn thread_name(&self) -> Option<&[u8]> {
209        self.thread_name.as_deref()
210    }
211
212    /// Return whether this thread is enabled or not.
213    pub fn is_enabled(&self) -> bool {
214        self.state.is_enabled()
215    }
216
217    /// Get the name of the current thread for display purposes; will include thread ID if not set.
218    fn thread_display_name(&self, id: ThreadId) -> String {
219        if let Some(ref thread_name) = self.thread_name {
220            String::from_utf8_lossy(thread_name).into_owned()
221        } else {
222            format!("unnamed-{}", id.index())
223        }
224    }
225
226    /// Return the top user-relevant frame, if there is one. `skip` indicates how many top frames
227    /// should be skipped.
228    /// Note that the choice to return `None` here when there is no user-relevant frame is part of
229    /// justifying the optimization that only pushes of user-relevant frames require updating the
230    /// `top_user_relevant_frame` field.
231    fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
232        self.stack
233            .iter()
234            .enumerate()
235            .rev()
236            .skip(skip)
237            .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
238    }
239
240    /// Re-compute the top user-relevant frame from scratch. `skip` indicates how many top frames
241    /// should be skipped.
242    pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
243        self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
244    }
245
246    /// Set the top user-relevant frame to the given value. Must be equal to what
247    /// `get_top_user_relevant_frame` would return!
248    pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
249        debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
250        self.top_user_relevant_frame = Some(frame_idx);
251    }
252
253    /// Returns the topmost frame that is considered user-relevant, or the
254    /// top of the stack if there is no such frame, or `None` if the stack is empty.
255    pub fn top_user_relevant_frame(&self) -> Option<usize> {
256        debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
257        // This can be called upon creation of an allocation. We create allocations while setting up
258        // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle
259        // empty stacks.
260        self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
261    }
262
263    pub fn current_span(&self) -> Span {
264        self.top_user_relevant_frame()
265            .map(|frame_idx| self.stack[frame_idx].current_span())
266            .unwrap_or(rustc_span::DUMMY_SP)
267    }
268}
269
270impl<'tcx> std::fmt::Debug for Thread<'tcx> {
271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272        write!(
273            f,
274            "{}({:?}, {:?})",
275            String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
276            self.state,
277            self.join_status
278        )
279    }
280}
281
282impl<'tcx> Thread<'tcx> {
283    fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
284        Self {
285            state: ThreadState::Enabled,
286            thread_name: name.map(|name| Vec::from(name.as_bytes())),
287            stack: Vec::new(),
288            top_user_relevant_frame: None,
289            join_status: ThreadJoinStatus::Joinable,
290            unwind_payloads: Vec::new(),
291            last_error: None,
292            on_stack_empty,
293        }
294    }
295}
296
297impl VisitProvenance for Thread<'_> {
298    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
299        let Thread {
300            unwind_payloads: panic_payload,
301            last_error,
302            stack,
303            top_user_relevant_frame: _,
304            state: _,
305            thread_name: _,
306            join_status: _,
307            on_stack_empty: _, // we assume the closure captures no GC-relevant state
308        } = self;
309
310        for payload in panic_payload {
311            payload.visit_provenance(visit);
312        }
313        last_error.visit_provenance(visit);
314        for frame in stack {
315            frame.visit_provenance(visit)
316        }
317    }
318}
319
320impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
321    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
322        let Frame {
323            return_place,
324            locals,
325            extra,
326            // There are some private fields we cannot access; they contain no tags.
327            ..
328        } = self;
329
330        // Return place.
331        return_place.visit_provenance(visit);
332        // Locals.
333        for local in locals.iter() {
334            match local.as_mplace_or_imm() {
335                None => {}
336                Some(Either::Left((ptr, meta))) => {
337                    ptr.visit_provenance(visit);
338                    meta.visit_provenance(visit);
339                }
340                Some(Either::Right(imm)) => {
341                    imm.visit_provenance(visit);
342                }
343            }
344        }
345
346        extra.visit_provenance(visit);
347    }
348}
349
350/// The moment in time when a blocked thread should be woken up.
351#[derive(Debug)]
352enum Timeout {
353    Monotonic(Instant),
354    RealTime(SystemTime),
355}
356
357impl Timeout {
358    /// How long do we have to wait from now until the specified time?
359    fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
360        match self {
361            Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
362            Timeout::RealTime(time) =>
363                time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
364        }
365    }
366
367    /// Will try to add `duration`, but if that overflows it may add less.
368    fn add_lossy(&self, duration: Duration) -> Self {
369        match self {
370            Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
371            Timeout::RealTime(s) => {
372                // If this overflows, try adding just 1h and assume that will not overflow.
373                Timeout::RealTime(
374                    s.checked_add(duration)
375                        .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
376                )
377            }
378        }
379    }
380}
381
382/// The clock to use for the timeout you are asking for.
383#[derive(Debug, Copy, Clone, PartialEq)]
384pub enum TimeoutClock {
385    Monotonic,
386    RealTime,
387}
388
389/// Whether the timeout is relative or absolute.
390#[derive(Debug, Copy, Clone)]
391pub enum TimeoutAnchor {
392    Relative,
393    Absolute,
394}
395
396/// An error signaling that the requested thread doesn't exist.
397#[derive(Debug, Copy, Clone)]
398pub struct ThreadNotFound;
399
400/// A set of threads.
401#[derive(Debug)]
402pub struct ThreadManager<'tcx> {
403    /// Identifier of the currently active thread.
404    active_thread: ThreadId,
405    /// Threads used in the program.
406    ///
407    /// Note that this vector also contains terminated threads.
408    threads: IndexVec<ThreadId, Thread<'tcx>>,
409    /// A mapping from a thread-local static to the thread specific allocation.
410    thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
411    /// A flag that indicates that we should change the active thread.
412    /// Completely ignored in GenMC mode.
413    yield_active_thread: bool,
414    /// A flag that indicates that we should do round robin scheduling of threads else randomized scheduling is used.
415    fixed_scheduling: bool,
416}
417
418impl VisitProvenance for ThreadManager<'_> {
419    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
420        let ThreadManager {
421            threads,
422            thread_local_allocs,
423            active_thread: _,
424            yield_active_thread: _,
425            fixed_scheduling: _,
426        } = self;
427
428        for thread in threads {
429            thread.visit_provenance(visit);
430        }
431        for ptr in thread_local_allocs.values() {
432            ptr.visit_provenance(visit);
433        }
434    }
435}
436
437impl<'tcx> ThreadManager<'tcx> {
438    pub(crate) fn new(config: &MiriConfig) -> Self {
439        let mut threads = IndexVec::new();
440        // Create the main thread and add it to the list of threads.
441        threads.push(Thread::new(Some("main"), None));
442        Self {
443            active_thread: ThreadId::MAIN_THREAD,
444            threads,
445            thread_local_allocs: Default::default(),
446            yield_active_thread: false,
447            fixed_scheduling: config.fixed_scheduling,
448        }
449    }
450
451    pub(crate) fn init(
452        ecx: &mut MiriInterpCx<'tcx>,
453        on_main_stack_empty: StackEmptyCallback<'tcx>,
454    ) {
455        ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
456            Some(on_main_stack_empty);
457        if ecx.tcx.sess.target.os.as_ref() != "windows" {
458            // The main thread can *not* be joined on except on windows.
459            ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
460                ThreadJoinStatus::Detached;
461        }
462    }
463
464    pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
465        if let Ok(id) = id.try_into()
466            && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
467        {
468            Ok(ThreadId(id))
469        } else {
470            Err(ThreadNotFound)
471        }
472    }
473
474    /// Check if we have an allocation for the given thread local static for the
475    /// active thread.
476    fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
477        self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
478    }
479
480    /// Set the pointer for the allocation of the given thread local
481    /// static for the active thread.
482    ///
483    /// Panics if a thread local is initialized twice for the same thread.
484    fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
485        self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
486    }
487
488    /// Borrow the stack of the active thread.
489    pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
490        &self.threads[self.active_thread].stack
491    }
492
493    /// Mutably borrow the stack of the active thread.
494    pub fn active_thread_stack_mut(
495        &mut self,
496    ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
497        &mut self.threads[self.active_thread].stack
498    }
499
500    pub fn all_stacks(
501        &self,
502    ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
503        self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
504    }
505
506    /// Create a new thread and returns its id.
507    fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
508        let new_thread_id = ThreadId::new(self.threads.len());
509        self.threads.push(Thread::new(None, Some(on_stack_empty)));
510        new_thread_id
511    }
512
513    /// Set an active thread and return the id of the thread that was active before.
514    fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
515        assert!(id.index() < self.threads.len());
516        info!(
517            "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
518            self.get_thread_display_name(id),
519            self.get_thread_display_name(self.active_thread)
520        );
521        std::mem::replace(&mut self.active_thread, id)
522    }
523
524    /// Get the id of the currently active thread.
525    pub fn active_thread(&self) -> ThreadId {
526        self.active_thread
527    }
528
529    /// Get the total number of threads that were ever spawn by this program.
530    pub fn get_total_thread_count(&self) -> usize {
531        self.threads.len()
532    }
533
534    /// Get the total of threads that are currently live, i.e., not yet terminated.
535    /// (They might be blocked.)
536    pub fn get_live_thread_count(&self) -> usize {
537        self.threads.iter().filter(|t| !t.state.is_terminated()).count()
538    }
539
540    /// Has the given thread terminated?
541    fn has_terminated(&self, thread_id: ThreadId) -> bool {
542        self.threads[thread_id].state.is_terminated()
543    }
544
545    /// Have all threads terminated?
546    fn have_all_terminated(&self) -> bool {
547        self.threads.iter().all(|thread| thread.state.is_terminated())
548    }
549
550    /// Enable the thread for execution. The thread must be terminated.
551    fn enable_thread(&mut self, thread_id: ThreadId) {
552        assert!(self.has_terminated(thread_id));
553        self.threads[thread_id].state = ThreadState::Enabled;
554    }
555
556    /// Get a mutable borrow of the currently active thread.
557    pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
558        &mut self.threads[self.active_thread]
559    }
560
561    /// Get a shared borrow of the currently active thread.
562    pub fn active_thread_ref(&self) -> &Thread<'tcx> {
563        &self.threads[self.active_thread]
564    }
565
566    /// Mark the thread as detached, which means that no other thread will try
567    /// to join it and the thread is responsible for cleaning up.
568    ///
569    /// `allow_terminated_joined` allows detaching joined threads that have already terminated.
570    /// This matches Windows's behavior for `CloseHandle`.
571    ///
572    /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>:
573    /// > The handle is valid until closed, even after the thread it represents has been terminated.
574    fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
575        trace!("detaching {:?}", id);
576
577        let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
578            // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
579            self.threads[id].join_status == ThreadJoinStatus::Detached
580        } else {
581            self.threads[id].join_status != ThreadJoinStatus::Joinable
582        };
583        if is_ub {
584            throw_ub_format!("trying to detach thread that was already detached or joined");
585        }
586
587        self.threads[id].join_status = ThreadJoinStatus::Detached;
588        interp_ok(())
589    }
590
591    /// Set the name of the given thread.
592    pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
593        self.threads[thread].thread_name = Some(new_thread_name);
594    }
595
596    /// Get the name of the given thread.
597    pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
598        self.threads[thread].thread_name()
599    }
600
601    pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
602        self.threads[thread].thread_display_name(thread)
603    }
604
605    /// Put the thread into the blocked state.
606    fn block_thread(
607        &mut self,
608        reason: BlockReason,
609        timeout: Option<Timeout>,
610        callback: DynUnblockCallback<'tcx>,
611    ) {
612        let state = &mut self.threads[self.active_thread].state;
613        assert!(state.is_enabled());
614        *state = ThreadState::Blocked { reason, timeout, callback }
615    }
616
617    /// Change the active thread to some enabled thread.
618    fn yield_active_thread(&mut self) {
619        // We do not yield immediately, as swapping out the current stack while executing a MIR statement
620        // could lead to all sorts of confusion.
621        // We should only switch stacks between steps.
622        self.yield_active_thread = true;
623    }
624
625    /// Get the wait time for the next timeout, or `None` if no timeout is pending.
626    fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
627        self.threads
628            .iter()
629            .filter_map(|t| {
630                match &t.state {
631                    ThreadState::Blocked { timeout: Some(timeout), .. } =>
632                        Some(timeout.get_wait_time(clock)),
633                    _ => None,
634                }
635            })
636            .min()
637    }
638}
639
640impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
641trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
642    /// Execute a timeout callback on the callback's thread.
643    #[inline]
644    fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
645        let this = self.eval_context_mut();
646        let mut found_callback = None;
647        // Find a blocked thread that has timed out.
648        for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
649            match &thread.state {
650                ThreadState::Blocked { timeout: Some(timeout), .. }
651                    if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
652                {
653                    let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
654                    let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
655                    found_callback = Some((id, callback));
656                    // Run the fallback (after the loop because borrow-checking).
657                    break;
658                }
659                _ => {}
660            }
661        }
662        if let Some((thread, callback)) = found_callback {
663            // This back-and-forth with `set_active_thread` is here because of two
664            // design decisions:
665            // 1. Make the caller and not the callback responsible for changing
666            //    thread.
667            // 2. Make the scheduler the only place that can change the active
668            //    thread.
669            let old_thread = this.machine.threads.set_active_thread_id(thread);
670            callback.call(this, UnblockKind::TimedOut)?;
671            this.machine.threads.set_active_thread_id(old_thread);
672        }
673        // found_callback can remain None if the computer's clock
674        // was shifted after calling the scheduler and before the call
675        // to get_ready_callback (see issue
676        // https://github.com/rust-lang/miri/issues/1763). In this case,
677        // just do nothing, which effectively just returns to the
678        // scheduler.
679        interp_ok(())
680    }
681
682    #[inline]
683    fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
684        let this = self.eval_context_mut();
685        let mut callback = this
686            .active_thread_mut()
687            .on_stack_empty
688            .take()
689            .expect("`on_stack_empty` not set up, or already running");
690        let res = callback(this)?;
691        this.active_thread_mut().on_stack_empty = Some(callback);
692        interp_ok(res)
693    }
694
695    /// Decide which action to take next and on which thread.
696    ///
697    /// The currently implemented scheduling policy is the one that is commonly
698    /// used in stateless model checkers such as Loom: run the active thread as
699    /// long as we can and switch only when we have to (the active thread was
700    /// blocked, terminated, or has explicitly asked to be preempted).
701    ///
702    /// If GenMC mode is active, the scheduling is instead handled by GenMC.
703    fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
704        let this = self.eval_context_mut();
705
706        // In GenMC mode, we let GenMC do the scheduling.
707        if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
708            let next_thread_id = genmc_ctx.schedule_thread(this)?;
709
710            let thread_manager = &mut this.machine.threads;
711            thread_manager.active_thread = next_thread_id;
712
713            assert!(thread_manager.threads[thread_manager.active_thread].state.is_enabled());
714            return interp_ok(SchedulingAction::ExecuteStep);
715        }
716
717        // We are not in GenMC mode, so we control the scheduling.
718        let thread_manager = &mut this.machine.threads;
719        let clock = &this.machine.monotonic_clock;
720        let rng = this.machine.rng.get_mut();
721        // This thread and the program can keep going.
722        if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
723            && !thread_manager.yield_active_thread
724        {
725            // The currently active thread is still enabled, just continue with it.
726            return interp_ok(SchedulingAction::ExecuteStep);
727        }
728        // The active thread yielded or got terminated. Let's see if there are any timeouts to take
729        // care of. We do this *before* running any other thread, to ensure that timeouts "in the
730        // past" fire before any other thread can take an action. This ensures that for
731        // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
732        // abstime has already been passed at the time of the call".
733        // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
734        let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
735        if potential_sleep_time == Some(Duration::ZERO) {
736            return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
737        }
738        // No callbacks immediately scheduled, pick a regular thread to execute.
739        // The active thread blocked or yielded. So we go search for another enabled thread.
740        // We build the list of threads by starting with the threads after the current one, followed by
741        // the threads before the current one and then the current thread itself (i.e., this iterator acts
742        // like `threads.rotate_left(self.active_thread.index() + 1)`. This ensures that if we pick the first
743        // eligible thread, we do regular round-robin scheduling, and all threads get a chance to take a step.
744        let mut threads_iter = thread_manager
745            .threads
746            .iter_enumerated()
747            .skip(thread_manager.active_thread.index() + 1)
748            .chain(
749                thread_manager
750                    .threads
751                    .iter_enumerated()
752                    .take(thread_manager.active_thread.index() + 1),
753            )
754            .filter(|(_id, thread)| thread.state.is_enabled());
755        // Pick a new thread, and switch to it.
756        let new_thread = if thread_manager.fixed_scheduling {
757            threads_iter.next()
758        } else {
759            threads_iter.choose(rng)
760        };
761
762        if let Some((id, _thread)) = new_thread {
763            if thread_manager.active_thread != id {
764                info!(
765                    "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
766                    thread_manager.get_thread_display_name(id),
767                    thread_manager.get_thread_display_name(thread_manager.active_thread)
768                );
769                thread_manager.active_thread = id;
770            }
771        }
772        // This completes the `yield`, if any was requested.
773        thread_manager.yield_active_thread = false;
774
775        if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
776            return interp_ok(SchedulingAction::ExecuteStep);
777        }
778        // We have not found a thread to execute.
779        if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
780            unreachable!("all threads terminated without the main thread terminating?!");
781        } else if let Some(sleep_time) = potential_sleep_time {
782            // All threads are currently blocked, but we have unexecuted
783            // timeout_callbacks, which may unblock some of the threads. Hence,
784            // sleep until the first callback.
785            interp_ok(SchedulingAction::Sleep(sleep_time))
786        } else {
787            throw_machine_stop!(TerminationInfo::Deadlock);
788        }
789    }
790}
791
792// Public interface to thread management.
793impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
794pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
795    #[inline]
796    fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
797        self.eval_context_ref().machine.threads.thread_id_try_from(id)
798    }
799
800    /// Get a thread-specific allocation id for the given thread-local static.
801    /// If needed, allocate a new one.
802    fn get_or_create_thread_local_alloc(
803        &mut self,
804        def_id: DefId,
805    ) -> InterpResult<'tcx, StrictPointer> {
806        let this = self.eval_context_mut();
807        let tcx = this.tcx;
808        if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
809            // We already have a thread-specific allocation id for this
810            // thread-local static.
811            interp_ok(old_alloc)
812        } else {
813            // We need to allocate a thread-specific allocation id for this
814            // thread-local static.
815            // First, we compute the initial value for this static.
816            if tcx.is_foreign_item(def_id) {
817                throw_unsup_format!("foreign thread-local statics are not supported");
818            }
819            let params = this.machine.get_default_alloc_params();
820            let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
821            // We make a full copy of this allocation.
822            let mut alloc = alloc.inner().adjust_from_tcx(
823                &this.tcx,
824                |bytes, align| {
825                    interp_ok(MiriAllocBytes::from_bytes(
826                        std::borrow::Cow::Borrowed(bytes),
827                        align,
828                        params,
829                    ))
830                },
831                |ptr| this.global_root_pointer(ptr),
832            )?;
833            // This allocation will be deallocated when the thread dies, so it is not in read-only memory.
834            alloc.mutability = Mutability::Mut;
835            // Create a fresh allocation with this content.
836            let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
837            this.machine.threads.set_thread_local_alloc(def_id, ptr);
838            interp_ok(ptr)
839        }
840    }
841
842    /// Start a regular (non-main) thread.
843    #[inline]
844    fn start_regular_thread(
845        &mut self,
846        thread: Option<MPlaceTy<'tcx>>,
847        start_routine: Pointer,
848        start_abi: ExternAbi,
849        func_arg: ImmTy<'tcx>,
850        ret_layout: TyAndLayout<'tcx>,
851    ) -> InterpResult<'tcx, ThreadId> {
852        let this = self.eval_context_mut();
853
854        // Create the new thread
855        let new_thread_id = this.machine.threads.create_thread({
856            let mut state = tls::TlsDtorsState::default();
857            Box::new(move |m| state.on_stack_empty(m))
858        });
859        let current_span = this.machine.current_span();
860        match &mut this.machine.data_race {
861            GlobalDataRaceHandler::None => {}
862            GlobalDataRaceHandler::Vclocks(data_race) =>
863                data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
864            GlobalDataRaceHandler::Genmc(genmc_ctx) =>
865                genmc_ctx.handle_thread_create(
866                    &this.machine.threads,
867                    start_routine,
868                    &func_arg,
869                    new_thread_id,
870                )?,
871        }
872        // Write the current thread-id, switch to the next thread later
873        // to treat this write operation as occurring on the current thread.
874        if let Some(thread_info_place) = thread {
875            this.write_scalar(
876                Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
877                &thread_info_place,
878            )?;
879        }
880
881        // Finally switch to new thread so that we can push the first stackframe.
882        // After this all accesses will be treated as occurring in the new thread.
883        let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
884
885        // The child inherits its parent's cpu affinity.
886        if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
887            this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
888        }
889
890        // Perform the function pointer load in the new thread frame.
891        let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
892
893        // Note: the returned value is currently ignored (see the FIXME in
894        // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use
895        // it.
896        let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
897
898        this.call_function(
899            instance,
900            start_abi,
901            &[func_arg],
902            Some(&ret_place),
903            ReturnContinuation::Stop { cleanup: true },
904        )?;
905
906        // Restore the old active thread frame.
907        this.machine.threads.set_active_thread_id(old_thread_id);
908
909        interp_ok(new_thread_id)
910    }
911
912    /// Handles thread termination of the active thread: wakes up threads joining on this one,
913    /// and deals with the thread's thread-local statics according to `tls_alloc_action`.
914    ///
915    /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
916    fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
917        let this = self.eval_context_mut();
918
919        // Mark thread as terminated.
920        let thread = this.active_thread_mut();
921        assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
922        thread.state = ThreadState::Terminated;
923
924        // Deallocate TLS.
925        let gone_thread = this.active_thread();
926        {
927            let mut free_tls_statics = Vec::new();
928            this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
929                if thread != gone_thread {
930                    // A different thread, keep this static around.
931                    return true;
932                }
933                // Delete this static from the map and from memory.
934                // We cannot free directly here as we cannot use `?` in this context.
935                free_tls_statics.push(alloc_id);
936                false
937            });
938            // Now free the TLS statics.
939            for ptr in free_tls_statics {
940                match tls_alloc_action {
941                    TlsAllocAction::Deallocate =>
942                        this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
943                    TlsAllocAction::Leak =>
944                        if let Some(alloc) = ptr.provenance.get_alloc_id() {
945                            trace!(
946                                "Thread-local static leaked and stored as static root: {:?}",
947                                alloc
948                            );
949                            this.machine.static_roots.push(alloc);
950                        },
951                }
952            }
953        }
954
955        match &mut this.machine.data_race {
956            GlobalDataRaceHandler::None => {}
957            GlobalDataRaceHandler::Vclocks(data_race) =>
958                data_race.thread_terminated(&this.machine.threads),
959            GlobalDataRaceHandler::Genmc(genmc_ctx) => {
960                // Inform GenMC that the thread finished.
961                // This needs to happen once all accesses to the thread are done, including freeing any TLS statics.
962                genmc_ctx.handle_thread_finish(&this.machine.threads)
963            }
964        }
965
966        // Unblock joining threads.
967        let unblock_reason = BlockReason::Join(gone_thread);
968        let threads = &this.machine.threads.threads;
969        let joining_threads = threads
970            .iter_enumerated()
971            .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
972            .map(|(id, _)| id)
973            .collect::<Vec<_>>();
974        for thread in joining_threads {
975            this.unblock_thread(thread, unblock_reason)?;
976        }
977
978        interp_ok(())
979    }
980
981    /// Block the current thread, with an optional timeout.
982    /// The callback will be invoked when the thread gets unblocked.
983    #[inline]
984    fn block_thread(
985        &mut self,
986        reason: BlockReason,
987        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
988        callback: DynUnblockCallback<'tcx>,
989    ) {
990        let this = self.eval_context_mut();
991        if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
992            panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
993        }
994        let timeout = timeout.map(|(clock, anchor, duration)| {
995            let anchor = match clock {
996                TimeoutClock::RealTime => {
997                    assert!(
998                        this.machine.communicate(),
999                        "cannot have `RealTime` timeout with isolation enabled!"
1000                    );
1001                    Timeout::RealTime(match anchor {
1002                        TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1003                        TimeoutAnchor::Relative => SystemTime::now(),
1004                    })
1005                }
1006                TimeoutClock::Monotonic =>
1007                    Timeout::Monotonic(match anchor {
1008                        TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1009                        TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1010                    }),
1011            };
1012            anchor.add_lossy(duration)
1013        });
1014        this.machine.threads.block_thread(reason, timeout, callback);
1015    }
1016
1017    /// Put the blocked thread into the enabled state.
1018    /// Sanity-checks that the thread previously was blocked for the right reason.
1019    fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1020        let this = self.eval_context_mut();
1021        let old_state =
1022            mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1023        let callback = match old_state {
1024            ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1025                assert_eq!(
1026                    reason, actual_reason,
1027                    "unblock_thread: thread was blocked for the wrong reason"
1028                );
1029                callback
1030            }
1031            _ => panic!("unblock_thread: thread was not blocked"),
1032        };
1033        // The callback must be executed in the previously blocked thread.
1034        let old_thread = this.machine.threads.set_active_thread_id(thread);
1035        callback.call(this, UnblockKind::Ready)?;
1036        this.machine.threads.set_active_thread_id(old_thread);
1037        interp_ok(())
1038    }
1039
1040    #[inline]
1041    fn detach_thread(
1042        &mut self,
1043        thread_id: ThreadId,
1044        allow_terminated_joined: bool,
1045    ) -> InterpResult<'tcx> {
1046        let this = self.eval_context_mut();
1047        this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1048    }
1049
1050    /// Mark that the active thread tries to join the thread with `joined_thread_id`.
1051    ///
1052    /// When the join is successful (immediately, or as soon as the joined thread finishes), `success_retval` will be written to `return_dest`.
1053    fn join_thread(
1054        &mut self,
1055        joined_thread_id: ThreadId,
1056        success_retval: Scalar,
1057        return_dest: &MPlaceTy<'tcx>,
1058    ) -> InterpResult<'tcx> {
1059        let this = self.eval_context_mut();
1060        let thread_mgr = &mut this.machine.threads;
1061        if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1062            // On Windows this corresponds to joining on a closed handle.
1063            throw_ub_format!("trying to join a detached thread");
1064        }
1065
1066        fn after_join<'tcx>(
1067            this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1068            joined_thread_id: ThreadId,
1069            success_retval: Scalar,
1070            return_dest: &MPlaceTy<'tcx>,
1071        ) -> InterpResult<'tcx> {
1072            let threads = &this.machine.threads;
1073            match &mut this.machine.data_race {
1074                GlobalDataRaceHandler::None => {}
1075                GlobalDataRaceHandler::Vclocks(data_race) =>
1076                    data_race.thread_joined(threads, joined_thread_id),
1077                GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1078                    genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1079            }
1080            this.write_scalar(success_retval, return_dest)?;
1081            interp_ok(())
1082        }
1083
1084        // Mark the joined thread as being joined so that we detect if other
1085        // threads try to join it.
1086        thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1087        if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1088            trace!(
1089                "{:?} blocked on {:?} when trying to join",
1090                thread_mgr.active_thread, joined_thread_id
1091            );
1092            if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1093                genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1094            }
1095
1096            // The joined thread is still running, we need to wait for it.
1097            // Once we get unblocked, perform the appropriate synchronization and write the return value.
1098            let dest = return_dest.clone();
1099            thread_mgr.block_thread(
1100                BlockReason::Join(joined_thread_id),
1101                None,
1102                callback!(
1103                    @capture<'tcx> {
1104                        joined_thread_id: ThreadId,
1105                        dest: MPlaceTy<'tcx>,
1106                        success_retval: Scalar,
1107                    }
1108                    |this, unblock: UnblockKind| {
1109                        assert_eq!(unblock, UnblockKind::Ready);
1110                        after_join(this, joined_thread_id, success_retval, &dest)
1111                    }
1112                ),
1113            );
1114        } else {
1115            // The thread has already terminated - establish happens-before and write the return value.
1116            after_join(this, joined_thread_id, success_retval, return_dest)?;
1117        }
1118        interp_ok(())
1119    }
1120
1121    /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
1122    /// If the thread is already joined by another thread, it will throw UB.
1123    ///
1124    /// When the join is successful (immediately, or as soon as the joined thread finishes), `success_retval` will be written to `return_dest`.
1125    fn join_thread_exclusive(
1126        &mut self,
1127        joined_thread_id: ThreadId,
1128        success_retval: Scalar,
1129        return_dest: &MPlaceTy<'tcx>,
1130    ) -> InterpResult<'tcx> {
1131        let this = self.eval_context_mut();
1132        let threads = &this.machine.threads.threads;
1133        if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1134            throw_ub_format!("trying to join an already joined thread");
1135        }
1136
1137        if joined_thread_id == this.machine.threads.active_thread {
1138            throw_ub_format!("trying to join itself");
1139        }
1140
1141        // Sanity check `join_status`.
1142        assert!(
1143            threads
1144                .iter()
1145                .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1146            "this thread already has threads waiting for its termination"
1147        );
1148
1149        this.join_thread(joined_thread_id, success_retval, return_dest)
1150    }
1151
1152    #[inline]
1153    fn active_thread(&self) -> ThreadId {
1154        let this = self.eval_context_ref();
1155        this.machine.threads.active_thread()
1156    }
1157
1158    #[inline]
1159    fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1160        let this = self.eval_context_mut();
1161        this.machine.threads.active_thread_mut()
1162    }
1163
1164    #[inline]
1165    fn active_thread_ref(&self) -> &Thread<'tcx> {
1166        let this = self.eval_context_ref();
1167        this.machine.threads.active_thread_ref()
1168    }
1169
1170    #[inline]
1171    fn get_total_thread_count(&self) -> usize {
1172        let this = self.eval_context_ref();
1173        this.machine.threads.get_total_thread_count()
1174    }
1175
1176    #[inline]
1177    fn have_all_terminated(&self) -> bool {
1178        let this = self.eval_context_ref();
1179        this.machine.threads.have_all_terminated()
1180    }
1181
1182    #[inline]
1183    fn enable_thread(&mut self, thread_id: ThreadId) {
1184        let this = self.eval_context_mut();
1185        this.machine.threads.enable_thread(thread_id);
1186    }
1187
1188    #[inline]
1189    fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1190        let this = self.eval_context_ref();
1191        this.machine.threads.active_thread_stack()
1192    }
1193
1194    #[inline]
1195    fn active_thread_stack_mut<'a>(
1196        &'a mut self,
1197    ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1198        let this = self.eval_context_mut();
1199        this.machine.threads.active_thread_stack_mut()
1200    }
1201
1202    /// Set the name of the current thread. The buffer must not include the null terminator.
1203    #[inline]
1204    fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1205        self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1206    }
1207
1208    #[inline]
1209    fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1210    where
1211        'tcx: 'c,
1212    {
1213        self.eval_context_ref().machine.threads.get_thread_name(thread)
1214    }
1215
1216    #[inline]
1217    fn yield_active_thread(&mut self) {
1218        self.eval_context_mut().machine.threads.yield_active_thread();
1219    }
1220
1221    #[inline]
1222    fn maybe_preempt_active_thread(&mut self) {
1223        use rand::Rng as _;
1224
1225        let this = self.eval_context_mut();
1226        if !this.machine.threads.fixed_scheduling
1227            && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1228        {
1229            this.yield_active_thread();
1230        }
1231    }
1232
1233    /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
1234    /// termination).
1235    fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1236        let this = self.eval_context_mut();
1237        loop {
1238            if CTRL_C_RECEIVED.load(Relaxed) {
1239                this.machine.handle_abnormal_termination();
1240                throw_machine_stop!(TerminationInfo::Interrupted);
1241            }
1242            match this.schedule()? {
1243                SchedulingAction::ExecuteStep => {
1244                    if !this.step()? {
1245                        // See if this thread can do something else.
1246                        match this.run_on_stack_empty()? {
1247                            Poll::Pending => {} // keep going
1248                            Poll::Ready(()) =>
1249                                this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1250                        }
1251                    }
1252                }
1253                SchedulingAction::ExecuteTimeoutCallback => {
1254                    this.run_timeout_callback()?;
1255                }
1256                SchedulingAction::Sleep(duration) => {
1257                    this.machine.monotonic_clock.sleep(duration);
1258                }
1259            }
1260        }
1261    }
1262}