miri/concurrency/
thread.rs

1//! Implements threads.
2
3use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rustc_abi::ExternAbi;
10use rustc_const_eval::CTRL_C_RECEIVED;
11use rustc_data_structures::fx::FxHashMap;
12use rustc_hir::def_id::DefId;
13use rustc_index::{Idx, IndexVec};
14use rustc_middle::mir::Mutability;
15use rustc_middle::ty::layout::TyAndLayout;
16use rustc_span::Span;
17
18use crate::concurrency::data_race;
19use crate::shims::tls;
20use crate::*;
21
22#[derive(Clone, Copy, Debug, PartialEq)]
23enum SchedulingAction {
24    /// Execute step on the active thread.
25    ExecuteStep,
26    /// Execute a timeout callback.
27    ExecuteTimeoutCallback,
28    /// Wait for a bit, until there is a timeout to be called.
29    Sleep(Duration),
30}
31
32/// What to do with TLS allocations from terminated threads
33#[derive(Clone, Copy, Debug, PartialEq)]
34pub enum TlsAllocAction {
35    /// Deallocate backing memory of thread-local statics as usual
36    Deallocate,
37    /// Skip deallocating backing memory of thread-local statics and consider all memory reachable
38    /// from them as "allowed to leak" (like global `static`s).
39    Leak,
40}
41
42/// The argument type for the "unblock" callback, indicating why the thread got unblocked.
43#[derive(Clone, Copy, Debug, PartialEq)]
44pub enum UnblockKind {
45    /// Operation completed successfully, thread continues normal execution.
46    Ready,
47    /// The operation did not complete within its specified duration.
48    TimedOut,
49}
50
51/// Type alias for unblock callbacks, i.e. machine callbacks invoked when
52/// a thread gets unblocked.
53pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
54
55/// A thread identifier.
56#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
57pub struct ThreadId(u32);
58
59impl ThreadId {
60    pub fn to_u32(self) -> u32 {
61        self.0
62    }
63
64    /// Create a new thread id from a `u32` without checking if this thread exists.
65    pub fn new_unchecked(id: u32) -> Self {
66        Self(id)
67    }
68
69    pub const MAIN_THREAD: ThreadId = ThreadId(0);
70}
71
72impl Idx for ThreadId {
73    fn new(idx: usize) -> Self {
74        ThreadId(u32::try_from(idx).unwrap())
75    }
76
77    fn index(self) -> usize {
78        usize::try_from(self.0).unwrap()
79    }
80}
81
82impl From<ThreadId> for u64 {
83    fn from(t: ThreadId) -> Self {
84        t.0.into()
85    }
86}
87
88/// Keeps track of what the thread is blocked on.
89#[derive(Debug, Copy, Clone, PartialEq, Eq)]
90pub enum BlockReason {
91    /// The thread tried to join the specified thread and is blocked until that
92    /// thread terminates.
93    Join(ThreadId),
94    /// Waiting for time to pass.
95    Sleep,
96    /// Blocked on a mutex.
97    Mutex,
98    /// Blocked on a condition variable.
99    Condvar(CondvarId),
100    /// Blocked on a reader-writer lock.
101    RwLock(RwLockId),
102    /// Blocked on a Futex variable.
103    Futex,
104    /// Blocked on an InitOnce.
105    InitOnce(InitOnceId),
106    /// Blocked on epoll.
107    Epoll,
108    /// Blocked on eventfd.
109    Eventfd,
110    /// Blocked on unnamed_socket.
111    UnnamedSocket,
112}
113
114/// The state of a thread.
115enum ThreadState<'tcx> {
116    /// The thread is enabled and can be executed.
117    Enabled,
118    /// The thread is blocked on something.
119    Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
120    /// The thread has terminated its execution. We do not delete terminated
121    /// threads (FIXME: why?).
122    Terminated,
123}
124
125impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            Self::Enabled => write!(f, "Enabled"),
129            Self::Blocked { reason, timeout, .. } =>
130                f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
131            Self::Terminated => write!(f, "Terminated"),
132        }
133    }
134}
135
136impl<'tcx> ThreadState<'tcx> {
137    fn is_enabled(&self) -> bool {
138        matches!(self, ThreadState::Enabled)
139    }
140
141    fn is_terminated(&self) -> bool {
142        matches!(self, ThreadState::Terminated)
143    }
144
145    fn is_blocked_on(&self, reason: BlockReason) -> bool {
146        matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
147    }
148}
149
150/// The join status of a thread.
151#[derive(Debug, Copy, Clone, PartialEq, Eq)]
152enum ThreadJoinStatus {
153    /// The thread can be joined.
154    Joinable,
155    /// A thread is detached if its join handle was destroyed and no other
156    /// thread can join it.
157    Detached,
158    /// The thread was already joined by some thread and cannot be joined again.
159    Joined,
160}
161
162/// A thread.
163pub struct Thread<'tcx> {
164    state: ThreadState<'tcx>,
165
166    /// Name of the thread.
167    thread_name: Option<Vec<u8>>,
168
169    /// The virtual call stack.
170    stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
171
172    /// The function to call when the stack ran empty, to figure out what to do next.
173    /// Conceptually, this is the interpreter implementation of the things that happen 'after' the
174    /// Rust language entry point for this thread returns (usually implemented by the C or OS runtime).
175    /// (`None` is an error, it means the callback has not been set up yet or is actively running.)
176    pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
177
178    /// The index of the topmost user-relevant frame in `stack`. This field must contain
179    /// the value produced by `get_top_user_relevant_frame`.
180    /// The `None` state here represents
181    /// This field is a cache to reduce how often we call that method. The cache is manually
182    /// maintained inside `MiriMachine::after_stack_push` and `MiriMachine::after_stack_pop`.
183    top_user_relevant_frame: Option<usize>,
184
185    /// The join status.
186    join_status: ThreadJoinStatus,
187
188    /// Stack of active panic payloads for the current thread. Used for storing
189    /// the argument of the call to `miri_start_unwind` (the panic payload) when unwinding.
190    /// This is pointer-sized, and matches the `Payload` type in `src/libpanic_unwind/miri.rs`.
191    ///
192    /// In real unwinding, the payload gets passed as an argument to the landing pad,
193    /// which then forwards it to 'Resume'. However this argument is implicit in MIR,
194    /// so we have to store it out-of-band. When there are multiple active unwinds,
195    /// the innermost one is always caught first, so we can store them as a stack.
196    pub(crate) panic_payloads: Vec<ImmTy<'tcx>>,
197
198    /// Last OS error location in memory. It is a 32-bit integer.
199    pub(crate) last_error: Option<MPlaceTy<'tcx>>,
200}
201
202pub type StackEmptyCallback<'tcx> =
203    Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
204
205impl<'tcx> Thread<'tcx> {
206    /// Get the name of the current thread if it was set.
207    fn thread_name(&self) -> Option<&[u8]> {
208        self.thread_name.as_deref()
209    }
210
211    /// Get the name of the current thread for display purposes; will include thread ID if not set.
212    fn thread_display_name(&self, id: ThreadId) -> String {
213        if let Some(ref thread_name) = self.thread_name {
214            String::from_utf8_lossy(thread_name).into_owned()
215        } else {
216            format!("unnamed-{}", id.index())
217        }
218    }
219
220    /// Return the top user-relevant frame, if there is one.
221    /// Note that the choice to return `None` here when there is no user-relevant frame is part of
222    /// justifying the optimization that only pushes of user-relevant frames require updating the
223    /// `top_user_relevant_frame` field.
224    fn compute_top_user_relevant_frame(&self) -> Option<usize> {
225        self.stack
226            .iter()
227            .enumerate()
228            .rev()
229            .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
230    }
231
232    /// Re-compute the top user-relevant frame from scratch.
233    pub fn recompute_top_user_relevant_frame(&mut self) {
234        self.top_user_relevant_frame = self.compute_top_user_relevant_frame();
235    }
236
237    /// Set the top user-relevant frame to the given value. Must be equal to what
238    /// `get_top_user_relevant_frame` would return!
239    pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
240        debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame());
241        self.top_user_relevant_frame = Some(frame_idx);
242    }
243
244    /// Returns the topmost frame that is considered user-relevant, or the
245    /// top of the stack if there is no such frame, or `None` if the stack is empty.
246    pub fn top_user_relevant_frame(&self) -> Option<usize> {
247        debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame());
248        // This can be called upon creation of an allocation. We create allocations while setting up
249        // parts of the Rust runtime when we do not have any stack frames yet, so we need to handle
250        // empty stacks.
251        self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
252    }
253
254    pub fn current_span(&self) -> Span {
255        self.top_user_relevant_frame()
256            .map(|frame_idx| self.stack[frame_idx].current_span())
257            .unwrap_or(rustc_span::DUMMY_SP)
258    }
259}
260
261impl<'tcx> std::fmt::Debug for Thread<'tcx> {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        write!(
264            f,
265            "{}({:?}, {:?})",
266            String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
267            self.state,
268            self.join_status
269        )
270    }
271}
272
273impl<'tcx> Thread<'tcx> {
274    fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
275        Self {
276            state: ThreadState::Enabled,
277            thread_name: name.map(|name| Vec::from(name.as_bytes())),
278            stack: Vec::new(),
279            top_user_relevant_frame: None,
280            join_status: ThreadJoinStatus::Joinable,
281            panic_payloads: Vec::new(),
282            last_error: None,
283            on_stack_empty,
284        }
285    }
286}
287
288impl VisitProvenance for Thread<'_> {
289    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
290        let Thread {
291            panic_payloads: panic_payload,
292            last_error,
293            stack,
294            top_user_relevant_frame: _,
295            state: _,
296            thread_name: _,
297            join_status: _,
298            on_stack_empty: _, // we assume the closure captures no GC-relevant state
299        } = self;
300
301        for payload in panic_payload {
302            payload.visit_provenance(visit);
303        }
304        last_error.visit_provenance(visit);
305        for frame in stack {
306            frame.visit_provenance(visit)
307        }
308    }
309}
310
311impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
312    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
313        let Frame {
314            return_place,
315            locals,
316            extra,
317            // There are some private fields we cannot access; they contain no tags.
318            ..
319        } = self;
320
321        // Return place.
322        return_place.visit_provenance(visit);
323        // Locals.
324        for local in locals.iter() {
325            match local.as_mplace_or_imm() {
326                None => {}
327                Some(Either::Left((ptr, meta))) => {
328                    ptr.visit_provenance(visit);
329                    meta.visit_provenance(visit);
330                }
331                Some(Either::Right(imm)) => {
332                    imm.visit_provenance(visit);
333                }
334            }
335        }
336
337        extra.visit_provenance(visit);
338    }
339}
340
341/// The moment in time when a blocked thread should be woken up.
342#[derive(Debug)]
343enum Timeout {
344    Monotonic(Instant),
345    RealTime(SystemTime),
346}
347
348impl Timeout {
349    /// How long do we have to wait from now until the specified time?
350    fn get_wait_time(&self, clock: &Clock) -> Duration {
351        match self {
352            Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
353            Timeout::RealTime(time) =>
354                time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
355        }
356    }
357
358    /// Will try to add `duration`, but if that overflows it may add less.
359    fn add_lossy(&self, duration: Duration) -> Self {
360        match self {
361            Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
362            Timeout::RealTime(s) => {
363                // If this overflows, try adding just 1h and assume that will not overflow.
364                Timeout::RealTime(
365                    s.checked_add(duration)
366                        .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
367                )
368            }
369        }
370    }
371}
372
373/// The clock to use for the timeout you are asking for.
374#[derive(Debug, Copy, Clone)]
375pub enum TimeoutClock {
376    Monotonic,
377    RealTime,
378}
379
380/// Whether the timeout is relative or absolute.
381#[derive(Debug, Copy, Clone)]
382pub enum TimeoutAnchor {
383    Relative,
384    Absolute,
385}
386
387/// An error signaling that the requested thread doesn't exist.
388#[derive(Debug, Copy, Clone)]
389pub struct ThreadNotFound;
390
391/// A set of threads.
392#[derive(Debug)]
393pub struct ThreadManager<'tcx> {
394    /// Identifier of the currently active thread.
395    active_thread: ThreadId,
396    /// Threads used in the program.
397    ///
398    /// Note that this vector also contains terminated threads.
399    threads: IndexVec<ThreadId, Thread<'tcx>>,
400    /// A mapping from a thread-local static to the thread specific allocation.
401    thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
402    /// A flag that indicates that we should change the active thread.
403    yield_active_thread: bool,
404}
405
406impl VisitProvenance for ThreadManager<'_> {
407    fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
408        let ThreadManager {
409            threads,
410            thread_local_allocs,
411            active_thread: _,
412            yield_active_thread: _,
413        } = self;
414
415        for thread in threads {
416            thread.visit_provenance(visit);
417        }
418        for ptr in thread_local_allocs.values() {
419            ptr.visit_provenance(visit);
420        }
421    }
422}
423
424impl<'tcx> Default for ThreadManager<'tcx> {
425    fn default() -> Self {
426        let mut threads = IndexVec::new();
427        // Create the main thread and add it to the list of threads.
428        threads.push(Thread::new(Some("main"), None));
429        Self {
430            active_thread: ThreadId::MAIN_THREAD,
431            threads,
432            thread_local_allocs: Default::default(),
433            yield_active_thread: false,
434        }
435    }
436}
437
438impl<'tcx> ThreadManager<'tcx> {
439    pub(crate) fn init(
440        ecx: &mut MiriInterpCx<'tcx>,
441        on_main_stack_empty: StackEmptyCallback<'tcx>,
442    ) {
443        ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
444            Some(on_main_stack_empty);
445        if ecx.tcx.sess.target.os.as_ref() != "windows" {
446            // The main thread can *not* be joined on except on windows.
447            ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
448                ThreadJoinStatus::Detached;
449        }
450    }
451
452    pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
453        if let Ok(id) = id.try_into()
454            && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
455        {
456            Ok(ThreadId(id))
457        } else {
458            Err(ThreadNotFound)
459        }
460    }
461
462    /// Check if we have an allocation for the given thread local static for the
463    /// active thread.
464    fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
465        self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
466    }
467
468    /// Set the pointer for the allocation of the given thread local
469    /// static for the active thread.
470    ///
471    /// Panics if a thread local is initialized twice for the same thread.
472    fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
473        self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
474    }
475
476    /// Borrow the stack of the active thread.
477    pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
478        &self.threads[self.active_thread].stack
479    }
480
481    /// Mutably borrow the stack of the active thread.
482    pub fn active_thread_stack_mut(
483        &mut self,
484    ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
485        &mut self.threads[self.active_thread].stack
486    }
487
488    pub fn all_stacks(
489        &self,
490    ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
491        self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
492    }
493
494    /// Create a new thread and returns its id.
495    fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
496        let new_thread_id = ThreadId::new(self.threads.len());
497        self.threads.push(Thread::new(None, Some(on_stack_empty)));
498        new_thread_id
499    }
500
501    /// Set an active thread and return the id of the thread that was active before.
502    fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
503        assert!(id.index() < self.threads.len());
504        info!(
505            "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
506            self.get_thread_display_name(id),
507            self.get_thread_display_name(self.active_thread)
508        );
509        std::mem::replace(&mut self.active_thread, id)
510    }
511
512    /// Get the id of the currently active thread.
513    pub fn active_thread(&self) -> ThreadId {
514        self.active_thread
515    }
516
517    /// Get the total number of threads that were ever spawn by this program.
518    pub fn get_total_thread_count(&self) -> usize {
519        self.threads.len()
520    }
521
522    /// Get the total of threads that are currently live, i.e., not yet terminated.
523    /// (They might be blocked.)
524    pub fn get_live_thread_count(&self) -> usize {
525        self.threads.iter().filter(|t| !t.state.is_terminated()).count()
526    }
527
528    /// Has the given thread terminated?
529    fn has_terminated(&self, thread_id: ThreadId) -> bool {
530        self.threads[thread_id].state.is_terminated()
531    }
532
533    /// Have all threads terminated?
534    fn have_all_terminated(&self) -> bool {
535        self.threads.iter().all(|thread| thread.state.is_terminated())
536    }
537
538    /// Enable the thread for execution. The thread must be terminated.
539    fn enable_thread(&mut self, thread_id: ThreadId) {
540        assert!(self.has_terminated(thread_id));
541        self.threads[thread_id].state = ThreadState::Enabled;
542    }
543
544    /// Get a mutable borrow of the currently active thread.
545    pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
546        &mut self.threads[self.active_thread]
547    }
548
549    /// Get a shared borrow of the currently active thread.
550    pub fn active_thread_ref(&self) -> &Thread<'tcx> {
551        &self.threads[self.active_thread]
552    }
553
554    /// Mark the thread as detached, which means that no other thread will try
555    /// to join it and the thread is responsible for cleaning up.
556    ///
557    /// `allow_terminated_joined` allows detaching joined threads that have already terminated.
558    /// This matches Windows's behavior for `CloseHandle`.
559    ///
560    /// See <https://docs.microsoft.com/en-us/windows/win32/procthread/thread-handles-and-identifiers>:
561    /// > The handle is valid until closed, even after the thread it represents has been terminated.
562    fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
563        trace!("detaching {:?}", id);
564
565        let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
566            // "Detached" in particular means "not yet joined". Redundant detaching is still UB.
567            self.threads[id].join_status == ThreadJoinStatus::Detached
568        } else {
569            self.threads[id].join_status != ThreadJoinStatus::Joinable
570        };
571        if is_ub {
572            throw_ub_format!("trying to detach thread that was already detached or joined");
573        }
574
575        self.threads[id].join_status = ThreadJoinStatus::Detached;
576        interp_ok(())
577    }
578
579    /// Mark that the active thread tries to join the thread with `joined_thread_id`.
580    fn join_thread(
581        &mut self,
582        joined_thread_id: ThreadId,
583        data_race: Option<&mut data_race::GlobalState>,
584    ) -> InterpResult<'tcx> {
585        if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
586            // On Windows this corresponds to joining on a closed handle.
587            throw_ub_format!("trying to join a detached thread");
588        }
589
590        // Mark the joined thread as being joined so that we detect if other
591        // threads try to join it.
592        self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
593        if !self.threads[joined_thread_id].state.is_terminated() {
594            trace!(
595                "{:?} blocked on {:?} when trying to join",
596                self.active_thread, joined_thread_id
597            );
598            // The joined thread is still running, we need to wait for it.
599            // Unce we get unblocked, perform the appropriate synchronization.
600            self.block_thread(
601                BlockReason::Join(joined_thread_id),
602                None,
603                callback!(
604                    @capture<'tcx> {
605                        joined_thread_id: ThreadId,
606                    }
607                    |this, unblock: UnblockKind| {
608                        assert_eq!(unblock, UnblockKind::Ready);
609                        if let Some(data_race) = &mut this.machine.data_race {
610                            data_race.thread_joined(&this.machine.threads, joined_thread_id);
611                        }
612                        interp_ok(())
613                    }
614                ),
615            );
616        } else {
617            // The thread has already terminated - establish happens-before
618            if let Some(data_race) = data_race {
619                data_race.thread_joined(self, joined_thread_id);
620            }
621        }
622        interp_ok(())
623    }
624
625    /// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
626    /// If the thread is already joined by another thread, it will throw UB
627    fn join_thread_exclusive(
628        &mut self,
629        joined_thread_id: ThreadId,
630        data_race: Option<&mut data_race::GlobalState>,
631    ) -> InterpResult<'tcx> {
632        if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
633            throw_ub_format!("trying to join an already joined thread");
634        }
635
636        if joined_thread_id == self.active_thread {
637            throw_ub_format!("trying to join itself");
638        }
639
640        // Sanity check `join_status`.
641        assert!(
642            self.threads
643                .iter()
644                .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
645            "this thread already has threads waiting for its termination"
646        );
647
648        self.join_thread(joined_thread_id, data_race)
649    }
650
651    /// Set the name of the given thread.
652    pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
653        self.threads[thread].thread_name = Some(new_thread_name);
654    }
655
656    /// Get the name of the given thread.
657    pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
658        self.threads[thread].thread_name()
659    }
660
661    pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
662        self.threads[thread].thread_display_name(thread)
663    }
664
665    /// Put the thread into the blocked state.
666    fn block_thread(
667        &mut self,
668        reason: BlockReason,
669        timeout: Option<Timeout>,
670        callback: DynUnblockCallback<'tcx>,
671    ) {
672        let state = &mut self.threads[self.active_thread].state;
673        assert!(state.is_enabled());
674        *state = ThreadState::Blocked { reason, timeout, callback }
675    }
676
677    /// Change the active thread to some enabled thread.
678    fn yield_active_thread(&mut self) {
679        // We do not yield immediately, as swapping out the current stack while executing a MIR statement
680        // could lead to all sorts of confusion.
681        // We should only switch stacks between steps.
682        self.yield_active_thread = true;
683    }
684
685    /// Get the wait time for the next timeout, or `None` if no timeout is pending.
686    fn next_callback_wait_time(&self, clock: &Clock) -> Option<Duration> {
687        self.threads
688            .iter()
689            .filter_map(|t| {
690                match &t.state {
691                    ThreadState::Blocked { timeout: Some(timeout), .. } =>
692                        Some(timeout.get_wait_time(clock)),
693                    _ => None,
694                }
695            })
696            .min()
697    }
698
699    /// Decide which action to take next and on which thread.
700    ///
701    /// The currently implemented scheduling policy is the one that is commonly
702    /// used in stateless model checkers such as Loom: run the active thread as
703    /// long as we can and switch only when we have to (the active thread was
704    /// blocked, terminated, or has explicitly asked to be preempted).
705    fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
706        // This thread and the program can keep going.
707        if self.threads[self.active_thread].state.is_enabled() && !self.yield_active_thread {
708            // The currently active thread is still enabled, just continue with it.
709            return interp_ok(SchedulingAction::ExecuteStep);
710        }
711        // The active thread yielded or got terminated. Let's see if there are any timeouts to take
712        // care of. We do this *before* running any other thread, to ensure that timeouts "in the
713        // past" fire before any other thread can take an action. This ensures that for
714        // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
715        // abstime has already been passed at the time of the call".
716        // <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
717        let potential_sleep_time = self.next_callback_wait_time(clock);
718        if potential_sleep_time == Some(Duration::ZERO) {
719            return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
720        }
721        // No callbacks immediately scheduled, pick a regular thread to execute.
722        // The active thread blocked or yielded. So we go search for another enabled thread.
723        // Crucially, we start searching at the current active thread ID, rather than at 0, since we
724        // want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
725        //
726        // `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
727        // the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
728        // active thread.
729        let threads = self
730            .threads
731            .iter_enumerated()
732            .skip(self.active_thread.index() + 1)
733            .chain(self.threads.iter_enumerated().take(self.active_thread.index()));
734        for (id, thread) in threads {
735            debug_assert_ne!(self.active_thread, id);
736            if thread.state.is_enabled() {
737                info!(
738                    "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
739                    self.get_thread_display_name(id),
740                    self.get_thread_display_name(self.active_thread)
741                );
742                self.active_thread = id;
743                break;
744            }
745        }
746        self.yield_active_thread = false;
747        if self.threads[self.active_thread].state.is_enabled() {
748            return interp_ok(SchedulingAction::ExecuteStep);
749        }
750        // We have not found a thread to execute.
751        if self.threads.iter().all(|thread| thread.state.is_terminated()) {
752            unreachable!("all threads terminated without the main thread terminating?!");
753        } else if let Some(sleep_time) = potential_sleep_time {
754            // All threads are currently blocked, but we have unexecuted
755            // timeout_callbacks, which may unblock some of the threads. Hence,
756            // sleep until the first callback.
757            interp_ok(SchedulingAction::Sleep(sleep_time))
758        } else {
759            throw_machine_stop!(TerminationInfo::Deadlock);
760        }
761    }
762}
763
764impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
765trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
766    /// Execute a timeout callback on the callback's thread.
767    #[inline]
768    fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
769        let this = self.eval_context_mut();
770        let mut found_callback = None;
771        // Find a blocked thread that has timed out.
772        for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
773            match &thread.state {
774                ThreadState::Blocked { timeout: Some(timeout), .. }
775                    if timeout.get_wait_time(&this.machine.clock) == Duration::ZERO =>
776                {
777                    let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
778                    let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
779                    found_callback = Some((id, callback));
780                    // Run the fallback (after the loop because borrow-checking).
781                    break;
782                }
783                _ => {}
784            }
785        }
786        if let Some((thread, callback)) = found_callback {
787            // This back-and-forth with `set_active_thread` is here because of two
788            // design decisions:
789            // 1. Make the caller and not the callback responsible for changing
790            //    thread.
791            // 2. Make the scheduler the only place that can change the active
792            //    thread.
793            let old_thread = this.machine.threads.set_active_thread_id(thread);
794            callback.call(this, UnblockKind::TimedOut)?;
795            this.machine.threads.set_active_thread_id(old_thread);
796        }
797        // found_callback can remain None if the computer's clock
798        // was shifted after calling the scheduler and before the call
799        // to get_ready_callback (see issue
800        // https://github.com/rust-lang/miri/issues/1763). In this case,
801        // just do nothing, which effectively just returns to the
802        // scheduler.
803        interp_ok(())
804    }
805
806    #[inline]
807    fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
808        let this = self.eval_context_mut();
809        let mut callback = this
810            .active_thread_mut()
811            .on_stack_empty
812            .take()
813            .expect("`on_stack_empty` not set up, or already running");
814        let res = callback(this)?;
815        this.active_thread_mut().on_stack_empty = Some(callback);
816        interp_ok(res)
817    }
818}
819
820// Public interface to thread management.
821impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
822pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
823    #[inline]
824    fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
825        self.eval_context_ref().machine.threads.thread_id_try_from(id)
826    }
827
828    /// Get a thread-specific allocation id for the given thread-local static.
829    /// If needed, allocate a new one.
830    fn get_or_create_thread_local_alloc(
831        &mut self,
832        def_id: DefId,
833    ) -> InterpResult<'tcx, StrictPointer> {
834        let this = self.eval_context_mut();
835        let tcx = this.tcx;
836        if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
837            // We already have a thread-specific allocation id for this
838            // thread-local static.
839            interp_ok(old_alloc)
840        } else {
841            // We need to allocate a thread-specific allocation id for this
842            // thread-local static.
843            // First, we compute the initial value for this static.
844            if tcx.is_foreign_item(def_id) {
845                throw_unsup_format!("foreign thread-local statics are not supported");
846            }
847            let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
848            // We make a full copy of this allocation.
849            let mut alloc = alloc.inner().adjust_from_tcx(
850                &this.tcx,
851                |bytes, align| {
852                    interp_ok(MiriAllocBytes::from_bytes(std::borrow::Cow::Borrowed(bytes), align))
853                },
854                |ptr| this.global_root_pointer(ptr),
855            )?;
856            // This allocation will be deallocated when the thread dies, so it is not in read-only memory.
857            alloc.mutability = Mutability::Mut;
858            // Create a fresh allocation with this content.
859            let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
860            this.machine.threads.set_thread_local_alloc(def_id, ptr);
861            interp_ok(ptr)
862        }
863    }
864
865    /// Start a regular (non-main) thread.
866    #[inline]
867    fn start_regular_thread(
868        &mut self,
869        thread: Option<MPlaceTy<'tcx>>,
870        start_routine: Pointer,
871        start_abi: ExternAbi,
872        func_arg: ImmTy<'tcx>,
873        ret_layout: TyAndLayout<'tcx>,
874    ) -> InterpResult<'tcx, ThreadId> {
875        let this = self.eval_context_mut();
876
877        // Create the new thread
878        let new_thread_id = this.machine.threads.create_thread({
879            let mut state = tls::TlsDtorsState::default();
880            Box::new(move |m| state.on_stack_empty(m))
881        });
882        let current_span = this.machine.current_span();
883        if let Some(data_race) = &mut this.machine.data_race {
884            data_race.thread_created(&this.machine.threads, new_thread_id, current_span);
885        }
886
887        // Write the current thread-id, switch to the next thread later
888        // to treat this write operation as occurring on the current thread.
889        if let Some(thread_info_place) = thread {
890            this.write_scalar(
891                Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
892                &thread_info_place,
893            )?;
894        }
895
896        // Finally switch to new thread so that we can push the first stackframe.
897        // After this all accesses will be treated as occurring in the new thread.
898        let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
899
900        // The child inherits its parent's cpu affinity.
901        if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
902            this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
903        }
904
905        // Perform the function pointer load in the new thread frame.
906        let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
907
908        // Note: the returned value is currently ignored (see the FIXME in
909        // pthread_join in shims/unix/thread.rs) because the Rust standard library does not use
910        // it.
911        let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
912
913        this.call_function(
914            instance,
915            start_abi,
916            &[func_arg],
917            Some(&ret_place),
918            StackPopCleanup::Root { cleanup: true },
919        )?;
920
921        // Restore the old active thread frame.
922        this.machine.threads.set_active_thread_id(old_thread_id);
923
924        interp_ok(new_thread_id)
925    }
926
927    /// Handles thread termination of the active thread: wakes up threads joining on this one,
928    /// and deals with the thread's thread-local statics according to `tls_alloc_action`.
929    ///
930    /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
931    fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
932        let this = self.eval_context_mut();
933        // Mark thread as terminated.
934        let thread = this.active_thread_mut();
935        assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
936        thread.state = ThreadState::Terminated;
937        if let Some(ref mut data_race) = this.machine.data_race {
938            data_race.thread_terminated(&this.machine.threads);
939        }
940        // Deallocate TLS.
941        let gone_thread = this.active_thread();
942        {
943            let mut free_tls_statics = Vec::new();
944            this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
945                if thread != gone_thread {
946                    // A different thread, keep this static around.
947                    return true;
948                }
949                // Delete this static from the map and from memory.
950                // We cannot free directly here as we cannot use `?` in this context.
951                free_tls_statics.push(alloc_id);
952                false
953            });
954            // Now free the TLS statics.
955            for ptr in free_tls_statics {
956                match tls_alloc_action {
957                    TlsAllocAction::Deallocate =>
958                        this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
959                    TlsAllocAction::Leak =>
960                        if let Some(alloc) = ptr.provenance.get_alloc_id() {
961                            trace!(
962                                "Thread-local static leaked and stored as static root: {:?}",
963                                alloc
964                            );
965                            this.machine.static_roots.push(alloc);
966                        },
967                }
968            }
969        }
970        // Unblock joining threads.
971        let unblock_reason = BlockReason::Join(gone_thread);
972        let threads = &this.machine.threads.threads;
973        let joining_threads = threads
974            .iter_enumerated()
975            .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
976            .map(|(id, _)| id)
977            .collect::<Vec<_>>();
978        for thread in joining_threads {
979            this.unblock_thread(thread, unblock_reason)?;
980        }
981
982        interp_ok(())
983    }
984
985    /// Block the current thread, with an optional timeout.
986    /// The callback will be invoked when the thread gets unblocked.
987    #[inline]
988    fn block_thread(
989        &mut self,
990        reason: BlockReason,
991        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
992        callback: DynUnblockCallback<'tcx>,
993    ) {
994        let this = self.eval_context_mut();
995        let timeout = timeout.map(|(clock, anchor, duration)| {
996            let anchor = match clock {
997                TimeoutClock::RealTime => {
998                    assert!(
999                        this.machine.communicate(),
1000                        "cannot have `RealTime` timeout with isolation enabled!"
1001                    );
1002                    Timeout::RealTime(match anchor {
1003                        TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1004                        TimeoutAnchor::Relative => SystemTime::now(),
1005                    })
1006                }
1007                TimeoutClock::Monotonic =>
1008                    Timeout::Monotonic(match anchor {
1009                        TimeoutAnchor::Absolute => this.machine.clock.epoch(),
1010                        TimeoutAnchor::Relative => this.machine.clock.now(),
1011                    }),
1012            };
1013            anchor.add_lossy(duration)
1014        });
1015        this.machine.threads.block_thread(reason, timeout, callback);
1016    }
1017
1018    /// Put the blocked thread into the enabled state.
1019    /// Sanity-checks that the thread previously was blocked for the right reason.
1020    fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1021        let this = self.eval_context_mut();
1022        let old_state =
1023            mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1024        let callback = match old_state {
1025            ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1026                assert_eq!(
1027                    reason, actual_reason,
1028                    "unblock_thread: thread was blocked for the wrong reason"
1029                );
1030                callback
1031            }
1032            _ => panic!("unblock_thread: thread was not blocked"),
1033        };
1034        // The callback must be executed in the previously blocked thread.
1035        let old_thread = this.machine.threads.set_active_thread_id(thread);
1036        callback.call(this, UnblockKind::Ready)?;
1037        this.machine.threads.set_active_thread_id(old_thread);
1038        interp_ok(())
1039    }
1040
1041    #[inline]
1042    fn detach_thread(
1043        &mut self,
1044        thread_id: ThreadId,
1045        allow_terminated_joined: bool,
1046    ) -> InterpResult<'tcx> {
1047        let this = self.eval_context_mut();
1048        this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1049    }
1050
1051    #[inline]
1052    fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
1053        let this = self.eval_context_mut();
1054        this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?;
1055        interp_ok(())
1056    }
1057
1058    #[inline]
1059    fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
1060        let this = self.eval_context_mut();
1061        this.machine
1062            .threads
1063            .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?;
1064        interp_ok(())
1065    }
1066
1067    #[inline]
1068    fn active_thread(&self) -> ThreadId {
1069        let this = self.eval_context_ref();
1070        this.machine.threads.active_thread()
1071    }
1072
1073    #[inline]
1074    fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1075        let this = self.eval_context_mut();
1076        this.machine.threads.active_thread_mut()
1077    }
1078
1079    #[inline]
1080    fn active_thread_ref(&self) -> &Thread<'tcx> {
1081        let this = self.eval_context_ref();
1082        this.machine.threads.active_thread_ref()
1083    }
1084
1085    #[inline]
1086    fn get_total_thread_count(&self) -> usize {
1087        let this = self.eval_context_ref();
1088        this.machine.threads.get_total_thread_count()
1089    }
1090
1091    #[inline]
1092    fn have_all_terminated(&self) -> bool {
1093        let this = self.eval_context_ref();
1094        this.machine.threads.have_all_terminated()
1095    }
1096
1097    #[inline]
1098    fn enable_thread(&mut self, thread_id: ThreadId) {
1099        let this = self.eval_context_mut();
1100        this.machine.threads.enable_thread(thread_id);
1101    }
1102
1103    #[inline]
1104    fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1105        let this = self.eval_context_ref();
1106        this.machine.threads.active_thread_stack()
1107    }
1108
1109    #[inline]
1110    fn active_thread_stack_mut<'a>(
1111        &'a mut self,
1112    ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1113        let this = self.eval_context_mut();
1114        this.machine.threads.active_thread_stack_mut()
1115    }
1116
1117    /// Set the name of the current thread. The buffer must not include the null terminator.
1118    #[inline]
1119    fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1120        self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1121    }
1122
1123    #[inline]
1124    fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1125    where
1126        'tcx: 'c,
1127    {
1128        self.eval_context_ref().machine.threads.get_thread_name(thread)
1129    }
1130
1131    #[inline]
1132    fn yield_active_thread(&mut self) {
1133        self.eval_context_mut().machine.threads.yield_active_thread();
1134    }
1135
1136    #[inline]
1137    fn maybe_preempt_active_thread(&mut self) {
1138        use rand::Rng as _;
1139
1140        let this = self.eval_context_mut();
1141        if this.machine.rng.get_mut().random_bool(this.machine.preemption_rate) {
1142            this.yield_active_thread();
1143        }
1144    }
1145
1146    /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
1147    /// termination).
1148    fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1149        let this = self.eval_context_mut();
1150        loop {
1151            if CTRL_C_RECEIVED.load(Relaxed) {
1152                this.machine.handle_abnormal_termination();
1153                std::process::exit(1);
1154            }
1155            match this.machine.threads.schedule(&this.machine.clock)? {
1156                SchedulingAction::ExecuteStep => {
1157                    if !this.step()? {
1158                        // See if this thread can do something else.
1159                        match this.run_on_stack_empty()? {
1160                            Poll::Pending => {} // keep going
1161                            Poll::Ready(()) =>
1162                                this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1163                        }
1164                    }
1165                }
1166                SchedulingAction::ExecuteTimeoutCallback => {
1167                    this.run_timeout_callback()?;
1168                }
1169                SchedulingAction::Sleep(duration) => {
1170                    this.machine.clock.sleep(duration);
1171                }
1172            }
1173        }
1174    }
1175}