1use std::sync::atomic::Ordering::Relaxed;
4use std::task::Poll;
5use std::time::{Duration, SystemTime};
6use std::{io, mem};
7
8use rand::seq::IteratorRandom;
9use rustc_abi::ExternAbi;
10use rustc_const_eval::CTRL_C_RECEIVED;
11use rustc_data_structures::either::Either;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::{DUMMY_SP, Span};
18use rustc_target::spec::Os;
19
20use crate::concurrency::GlobalDataRaceHandler;
21use crate::shims::tls;
22use crate::*;
23
24#[derive(Clone, Copy, Debug, PartialEq)]
25enum SchedulingAction {
26 ExecuteStep,
28 SleepAndWaitForIo(Option<Duration>),
33}
34
35#[derive(Clone, Copy, Debug, PartialEq)]
37pub enum TlsAllocAction {
38 Deallocate,
40 Leak,
43}
44
45#[derive(Clone, Copy, Debug, PartialEq)]
47pub enum UnblockKind {
48 Ready,
50 TimedOut,
52}
53
54pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
57
58#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
60pub struct ThreadId(u32);
61
62impl ThreadId {
63 pub fn to_u32(self) -> u32 {
64 self.0
65 }
66
67 pub fn new_unchecked(id: u32) -> Self {
69 Self(id)
70 }
71
72 pub const MAIN_THREAD: ThreadId = ThreadId(0);
73}
74
75impl Idx for ThreadId {
76 fn new(idx: usize) -> Self {
77 ThreadId(u32::try_from(idx).unwrap())
78 }
79
80 fn index(self) -> usize {
81 usize::try_from(self.0).unwrap()
82 }
83}
84
85impl From<ThreadId> for u64 {
86 fn from(t: ThreadId) -> Self {
87 t.0.into()
88 }
89}
90
91#[derive(Debug, Copy, Clone, PartialEq, Eq)]
93pub enum BlockReason {
94 Join(ThreadId),
97 Sleep,
99 Mutex,
101 Condvar,
103 RwLock,
105 Futex,
107 InitOnce,
109 Epoll,
111 Eventfd,
113 UnnamedSocket,
115 IO,
117 Genmc,
120}
121
122enum ThreadState<'tcx> {
124 Enabled,
126 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
128 Terminated,
131}
132
133impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 Self::Enabled => write!(f, "Enabled"),
137 Self::Blocked { reason, timeout, .. } =>
138 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
139 Self::Terminated => write!(f, "Terminated"),
140 }
141 }
142}
143
144impl<'tcx> ThreadState<'tcx> {
145 fn is_enabled(&self) -> bool {
146 matches!(self, ThreadState::Enabled)
147 }
148
149 fn is_terminated(&self) -> bool {
150 matches!(self, ThreadState::Terminated)
151 }
152
153 fn is_blocked_on(&self, reason: BlockReason) -> bool {
154 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
155 }
156}
157
158#[derive(Debug, Copy, Clone, PartialEq, Eq)]
160enum ThreadJoinStatus {
161 Joinable,
163 Detached,
166 Joined,
168}
169
170pub struct Thread<'tcx> {
172 state: ThreadState<'tcx>,
173
174 thread_name: Option<Vec<u8>>,
176
177 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
179
180 pub(crate) origin_span: Span,
183
184 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
189
190 top_user_relevant_frame: Option<usize>,
195
196 join_status: ThreadJoinStatus,
198
199 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
208
209 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
211}
212
213pub type StackEmptyCallback<'tcx> =
214 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
215
216impl<'tcx> Thread<'tcx> {
217 fn thread_name(&self) -> Option<&[u8]> {
219 self.thread_name.as_deref()
220 }
221
222 pub fn is_enabled(&self) -> bool {
224 self.state.is_enabled()
225 }
226
227 fn thread_display_name(&self, id: ThreadId) -> String {
229 if let Some(ref thread_name) = self.thread_name {
230 String::from_utf8_lossy(thread_name).into_owned()
231 } else {
232 format!("unnamed-{}", id.index())
233 }
234 }
235
236 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
242 let mut best = None;
244 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
245 let relevance = frame.extra.user_relevance;
246 if relevance == u8::MAX {
247 return Some(idx);
249 }
250 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
251 best = Some((idx, relevance));
254 }
255 }
256 best.map(|(idx, _relevance)| idx)
257 }
258
259 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
262 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
263 }
264
265 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
268 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
269 self.top_user_relevant_frame = Some(frame_idx);
270 }
271
272 pub fn top_user_relevant_frame(&self) -> Option<usize> {
275 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
279 }
280
281 pub fn current_user_relevance(&self) -> u8 {
282 self.top_user_relevant_frame()
283 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
284 .unwrap_or(0)
285 }
286
287 pub fn current_user_relevant_span(&self) -> Span {
288 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
289 self.top_user_relevant_frame()
290 .map(|frame_idx| self.stack[frame_idx].current_span())
291 .unwrap_or(rustc_span::DUMMY_SP)
292 }
293}
294
295impl<'tcx> std::fmt::Debug for Thread<'tcx> {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 write!(
298 f,
299 "{}({:?}, {:?})",
300 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
301 self.state,
302 self.join_status
303 )
304 }
305}
306
307impl<'tcx> Thread<'tcx> {
308 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
309 Self {
310 state: ThreadState::Enabled,
311 thread_name: name.map(|name| Vec::from(name.as_bytes())),
312 stack: Vec::new(),
313 origin_span: DUMMY_SP,
314 top_user_relevant_frame: None,
315 join_status: ThreadJoinStatus::Joinable,
316 unwind_payloads: Vec::new(),
317 last_error: None,
318 on_stack_empty,
319 }
320 }
321}
322
323impl VisitProvenance for Thread<'_> {
324 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
325 let Thread {
326 unwind_payloads: panic_payload,
327 last_error,
328 stack,
329 origin_span: _,
330 top_user_relevant_frame: _,
331 state: _,
332 thread_name: _,
333 join_status: _,
334 on_stack_empty: _, } = self;
336
337 for payload in panic_payload {
338 payload.visit_provenance(visit);
339 }
340 last_error.visit_provenance(visit);
341 for frame in stack {
342 frame.visit_provenance(visit)
343 }
344 }
345}
346
347impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
348 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
349 let return_place = self.return_place();
350 let Frame {
351 locals,
352 extra,
353 ..
355 } = self;
356
357 return_place.visit_provenance(visit);
359 for local in locals.iter() {
361 match local.as_mplace_or_imm() {
362 None => {}
363 Some(Either::Left((ptr, meta))) => {
364 ptr.visit_provenance(visit);
365 meta.visit_provenance(visit);
366 }
367 Some(Either::Right(imm)) => {
368 imm.visit_provenance(visit);
369 }
370 }
371 }
372
373 extra.visit_provenance(visit);
374 }
375}
376
377#[derive(Debug)]
379enum Timeout {
380 Monotonic(Instant),
381 RealTime(SystemTime),
382}
383
384impl Timeout {
385 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
387 match self {
388 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
389 Timeout::RealTime(time) =>
390 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
391 }
392 }
393
394 fn add_lossy(&self, duration: Duration) -> Self {
396 match self {
397 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
398 Timeout::RealTime(s) => {
399 Timeout::RealTime(
401 s.checked_add(duration)
402 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
403 )
404 }
405 }
406 }
407}
408
409#[derive(Debug, Copy, Clone, PartialEq)]
411pub enum TimeoutClock {
412 Monotonic,
413 RealTime,
414}
415
416#[derive(Debug, Copy, Clone)]
418pub enum TimeoutAnchor {
419 Relative,
420 Absolute,
421}
422
423#[derive(Debug, Copy, Clone)]
425pub struct ThreadNotFound;
426
427#[derive(Debug)]
429pub struct ThreadManager<'tcx> {
430 active_thread: ThreadId,
432 threads: IndexVec<ThreadId, Thread<'tcx>>,
436 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
438 yield_active_thread: bool,
441 fixed_scheduling: bool,
443}
444
445impl VisitProvenance for ThreadManager<'_> {
446 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
447 let ThreadManager {
448 threads,
449 thread_local_allocs,
450 active_thread: _,
451 yield_active_thread: _,
452 fixed_scheduling: _,
453 } = self;
454
455 for thread in threads {
456 thread.visit_provenance(visit);
457 }
458 for ptr in thread_local_allocs.values() {
459 ptr.visit_provenance(visit);
460 }
461 }
462}
463
464impl<'tcx> ThreadManager<'tcx> {
465 pub(crate) fn new(config: &MiriConfig) -> Self {
466 let mut threads = IndexVec::new();
467 threads.push(Thread::new(Some("main"), None));
469 Self {
470 active_thread: ThreadId::MAIN_THREAD,
471 threads,
472 thread_local_allocs: Default::default(),
473 yield_active_thread: false,
474 fixed_scheduling: config.fixed_scheduling,
475 }
476 }
477
478 pub(crate) fn init(
479 ecx: &mut MiriInterpCx<'tcx>,
480 on_main_stack_empty: StackEmptyCallback<'tcx>,
481 ) {
482 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
483 Some(on_main_stack_empty);
484 if ecx.tcx.sess.target.os != Os::Windows {
485 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
487 ThreadJoinStatus::Detached;
488 }
489 }
490
491 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
492 if let Ok(id) = id.try_into()
493 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
494 {
495 Ok(ThreadId(id))
496 } else {
497 Err(ThreadNotFound)
498 }
499 }
500
501 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
504 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
505 }
506
507 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
512 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
513 }
514
515 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
517 &self.threads[self.active_thread].stack
518 }
519
520 pub fn active_thread_stack_mut(
522 &mut self,
523 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
524 &mut self.threads[self.active_thread].stack
525 }
526
527 pub fn all_blocked_stacks(
528 &self,
529 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
530 self.threads
531 .iter_enumerated()
532 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
533 .map(|(id, t)| (id, &t.stack[..]))
534 }
535
536 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
538 let new_thread_id = ThreadId::new(self.threads.len());
539 self.threads.push(Thread::new(None, Some(on_stack_empty)));
540 new_thread_id
541 }
542
543 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
545 assert!(id.index() < self.threads.len());
546 info!(
547 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
548 self.get_thread_display_name(id),
549 self.get_thread_display_name(self.active_thread)
550 );
551 std::mem::replace(&mut self.active_thread, id)
552 }
553
554 pub fn active_thread(&self) -> ThreadId {
556 self.active_thread
557 }
558
559 pub fn get_total_thread_count(&self) -> usize {
561 self.threads.len()
562 }
563
564 pub fn get_live_thread_count(&self) -> usize {
567 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
568 }
569
570 fn has_terminated(&self, thread_id: ThreadId) -> bool {
572 self.threads[thread_id].state.is_terminated()
573 }
574
575 fn have_all_terminated(&self) -> bool {
577 self.threads.iter().all(|thread| thread.state.is_terminated())
578 }
579
580 fn enable_thread(&mut self, thread_id: ThreadId) {
582 assert!(self.has_terminated(thread_id));
583 self.threads[thread_id].state = ThreadState::Enabled;
584 }
585
586 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
588 &mut self.threads[self.active_thread]
589 }
590
591 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
593 &self.threads[self.active_thread]
594 }
595
596 pub fn thread_ref(&self, thread_id: ThreadId) -> &Thread<'tcx> {
597 &self.threads[thread_id]
598 }
599
600 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
609 trace!("detaching {:?}", id);
611
612 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
613 self.threads[id].join_status == ThreadJoinStatus::Detached
615 } else {
616 self.threads[id].join_status != ThreadJoinStatus::Joinable
617 };
618 if is_ub {
619 throw_ub_format!("trying to detach thread that was already detached or joined");
620 }
621
622 self.threads[id].join_status = ThreadJoinStatus::Detached;
623 interp_ok(())
624 }
625
626 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
628 self.threads[thread].thread_name = Some(new_thread_name);
629 }
630
631 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
633 self.threads[thread].thread_name()
634 }
635
636 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
637 self.threads[thread].thread_display_name(thread)
638 }
639
640 fn block_thread(
642 &mut self,
643 reason: BlockReason,
644 timeout: Option<Timeout>,
645 callback: DynUnblockCallback<'tcx>,
646 ) {
647 let state = &mut self.threads[self.active_thread].state;
648 assert!(state.is_enabled());
649 *state = ThreadState::Blocked { reason, timeout, callback }
650 }
651
652 fn yield_active_thread(&mut self) {
654 self.yield_active_thread = true;
658 }
659}
660
661impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
662trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
663 #[inline]
664 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
665 let this = self.eval_context_mut();
666 let active_thread = this.active_thread_mut();
667 active_thread.origin_span = DUMMY_SP; let mut callback = active_thread
669 .on_stack_empty
670 .take()
671 .expect("`on_stack_empty` not set up, or already running");
672 let res = callback(this)?;
673 this.active_thread_mut().on_stack_empty = Some(callback);
674 interp_ok(res)
675 }
676
677 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
686 let this = self.eval_context_mut();
687
688 if this.machine.data_race.as_genmc_ref().is_some() {
690 loop {
691 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
692 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
693 return interp_ok(SchedulingAction::ExecuteStep);
694 };
695 if this.machine.threads.threads[next_thread_id]
697 .state
698 .is_blocked_on(BlockReason::Genmc)
699 {
700 info!(
701 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
702 );
703 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
704 }
705 let thread_manager = &mut this.machine.threads;
708 if thread_manager.threads[next_thread_id].state.is_enabled() {
709 thread_manager.active_thread = next_thread_id;
711 return interp_ok(SchedulingAction::ExecuteStep);
712 }
713 }
714 }
715
716 let thread_manager = &this.machine.threads;
718 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
720 && !thread_manager.yield_active_thread
721 {
722 return interp_ok(SchedulingAction::ExecuteStep);
724 }
725
726 if this.machine.communicate() {
730 this.poll_and_unblock(Some(Duration::ZERO))?;
737 }
738
739 let potential_sleep_time = this.unblock_expired_timeouts()?;
745
746 let thread_manager = &mut this.machine.threads;
747 let rng = this.machine.rng.get_mut();
748
749 let mut threads_iter = thread_manager
756 .threads
757 .iter_enumerated()
758 .skip(thread_manager.active_thread.index() + 1)
759 .chain(
760 thread_manager
761 .threads
762 .iter_enumerated()
763 .take(thread_manager.active_thread.index() + 1),
764 )
765 .filter(|(_id, thread)| thread.state.is_enabled());
766 let new_thread = if thread_manager.fixed_scheduling {
768 threads_iter.next()
769 } else {
770 threads_iter.choose(rng)
771 };
772
773 if let Some((id, _thread)) = new_thread {
774 if thread_manager.active_thread != id {
775 info!(
776 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
777 thread_manager.get_thread_display_name(id),
778 thread_manager.get_thread_display_name(thread_manager.active_thread)
779 );
780 thread_manager.active_thread = id;
781 }
782 }
783 thread_manager.yield_active_thread = false;
785
786 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
787 return interp_ok(SchedulingAction::ExecuteStep);
788 }
789 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
791 unreachable!("all threads terminated without the main thread terminating?!");
792 } else if let Some(sleep_time) = potential_sleep_time {
793 interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
797 } else if thread_manager
798 .threads
799 .iter()
800 .any(|thread| thread.state.is_blocked_on(BlockReason::IO))
801 {
802 interp_ok(SchedulingAction::SleepAndWaitForIo(None))
806 } else {
807 throw_machine_stop!(TerminationInfo::GlobalDeadlock);
808 }
809 }
810
811 fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
814 let this = self.eval_context_mut();
815
816 let ready = match this.machine.blocking_io.poll(timeout) {
817 Ok(ready) => ready,
818 Err(e) if e.kind() == io::ErrorKind::Interrupted => return interp_ok(()),
820 Err(e) => panic!("unexpected error while polling: {e}"),
823 };
824
825 ready.into_iter().try_for_each(|thread_id| this.unblock_thread(thread_id, BlockReason::IO))
826 }
827
828 fn unblock_expired_timeouts(&mut self) -> InterpResult<'tcx, Option<Duration>> {
833 let this = self.eval_context_mut();
834 let clock = &this.machine.monotonic_clock;
835
836 let mut min_wait_time = Option::<Duration>::None;
837 let mut callbacks = Vec::new();
838
839 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
840 match &thread.state {
841 ThreadState::Blocked { timeout: Some(timeout), .. } => {
842 let wait_time = timeout.get_wait_time(clock);
843 if wait_time.is_zero() {
844 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
846 let ThreadState::Blocked { callback, .. } = old_state else {
847 unreachable!()
848 };
849 callbacks.push((id, callback));
851 } else {
852 min_wait_time = Some(wait_time.min(min_wait_time.unwrap_or(Duration::MAX)));
855 }
856 }
857 _ => {}
858 }
859 }
860
861 for (thread, callback) in callbacks {
862 let old_thread = this.machine.threads.set_active_thread_id(thread);
869 callback.call(this, UnblockKind::TimedOut)?;
870 this.machine.threads.set_active_thread_id(old_thread);
871 }
872
873 interp_ok(min_wait_time)
874 }
875}
876
877impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
879pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
880 #[inline]
881 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
882 self.eval_context_ref().machine.threads.thread_id_try_from(id)
883 }
884
885 fn get_or_create_thread_local_alloc(
888 &mut self,
889 def_id: DefId,
890 ) -> InterpResult<'tcx, StrictPointer> {
891 let this = self.eval_context_mut();
892 let tcx = this.tcx;
893 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
894 interp_ok(old_alloc)
897 } else {
898 if tcx.is_foreign_item(def_id) {
902 throw_unsup_format!("foreign thread-local statics are not supported");
903 }
904 let params = this.machine.get_default_alloc_params();
905 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
906 let mut alloc = alloc.inner().adjust_from_tcx(
908 &this.tcx,
909 |bytes, align| {
910 interp_ok(MiriAllocBytes::from_bytes(
911 std::borrow::Cow::Borrowed(bytes),
912 align,
913 params,
914 ))
915 },
916 |ptr| this.global_root_pointer(ptr),
917 )?;
918 alloc.mutability = Mutability::Mut;
920 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
922 this.machine.threads.set_thread_local_alloc(def_id, ptr);
923 interp_ok(ptr)
924 }
925 }
926
927 #[inline]
929 fn start_regular_thread(
930 &mut self,
931 thread: Option<MPlaceTy<'tcx>>,
932 start_routine: Pointer,
933 start_abi: ExternAbi,
934 func_arg: ImmTy<'tcx>,
935 ret_layout: TyAndLayout<'tcx>,
936 ) -> InterpResult<'tcx, ThreadId> {
937 let this = self.eval_context_mut();
938
939 let current_span = this.machine.current_user_relevant_span();
941 let new_thread_id = this.machine.threads.create_thread({
942 let mut state = tls::TlsDtorsState::default();
943 Box::new(move |m| state.on_stack_empty(m))
944 });
945 match &mut this.machine.data_race {
946 GlobalDataRaceHandler::None => {}
947 GlobalDataRaceHandler::Vclocks(data_race) =>
948 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
949 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
950 genmc_ctx.handle_thread_create(
951 &this.machine.threads,
952 start_routine,
953 &func_arg,
954 new_thread_id,
955 )?,
956 }
957 if let Some(thread_info_place) = thread {
960 this.write_scalar(
961 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
962 &thread_info_place,
963 )?;
964 }
965
966 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
969
970 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
972 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
973 }
974
975 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
977
978 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
982
983 this.call_thread_root_function(
984 instance,
985 start_abi,
986 &[func_arg],
987 Some(&ret_place),
988 current_span,
989 )?;
990
991 this.machine.threads.set_active_thread_id(old_thread_id);
993
994 interp_ok(new_thread_id)
995 }
996
997 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
1002 let this = self.eval_context_mut();
1003
1004 let thread = this.active_thread_mut();
1006 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1007 thread.state = ThreadState::Terminated;
1008
1009 let gone_thread = this.active_thread();
1011 {
1012 let mut free_tls_statics = Vec::new();
1013 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
1014 if thread != gone_thread {
1015 return true;
1017 }
1018 free_tls_statics.push(alloc_id);
1021 false
1022 });
1023 for ptr in free_tls_statics {
1025 match tls_alloc_action {
1026 TlsAllocAction::Deallocate =>
1027 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
1028 TlsAllocAction::Leak =>
1029 if let Some(alloc) = ptr.provenance.get_alloc_id() {
1030 trace!(
1031 "Thread-local static leaked and stored as static root: {:?}",
1032 alloc
1033 );
1034 this.machine.static_roots.push(alloc);
1035 },
1036 }
1037 }
1038 }
1039
1040 match &mut this.machine.data_race {
1041 GlobalDataRaceHandler::None => {}
1042 GlobalDataRaceHandler::Vclocks(data_race) =>
1043 data_race.thread_terminated(&this.machine.threads),
1044 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
1045 genmc_ctx.handle_thread_finish(&this.machine.threads)
1048 }
1049 }
1050
1051 let unblock_reason = BlockReason::Join(gone_thread);
1053 let threads = &this.machine.threads.threads;
1054 let joining_threads = threads
1055 .iter_enumerated()
1056 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
1057 .map(|(id, _)| id)
1058 .collect::<Vec<_>>();
1059 for thread in joining_threads {
1060 this.unblock_thread(thread, unblock_reason)?;
1061 }
1062
1063 interp_ok(())
1064 }
1065
1066 #[inline]
1069 fn block_thread(
1070 &mut self,
1071 reason: BlockReason,
1072 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
1073 callback: DynUnblockCallback<'tcx>,
1074 ) {
1075 let this = self.eval_context_mut();
1076 if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1077 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1078 }
1079 let timeout = timeout.map(|(clock, anchor, duration)| {
1080 let anchor = match clock {
1081 TimeoutClock::RealTime => {
1082 assert!(
1083 this.machine.communicate(),
1084 "cannot have `RealTime` timeout with isolation enabled!"
1085 );
1086 Timeout::RealTime(match anchor {
1087 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1088 TimeoutAnchor::Relative => SystemTime::now(),
1089 })
1090 }
1091 TimeoutClock::Monotonic =>
1092 Timeout::Monotonic(match anchor {
1093 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1094 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1095 }),
1096 };
1097 anchor.add_lossy(duration)
1098 });
1099 this.machine.threads.block_thread(reason, timeout, callback);
1100 }
1101
1102 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1105 let this = self.eval_context_mut();
1106 let old_state =
1107 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1108 let callback = match old_state {
1109 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1110 assert_eq!(
1111 reason, actual_reason,
1112 "unblock_thread: thread was blocked for the wrong reason"
1113 );
1114 callback
1115 }
1116 _ => panic!("unblock_thread: thread was not blocked"),
1117 };
1118 let old_thread = this.machine.threads.set_active_thread_id(thread);
1120 callback.call(this, UnblockKind::Ready)?;
1121 this.machine.threads.set_active_thread_id(old_thread);
1122 interp_ok(())
1123 }
1124
1125 #[inline]
1126 fn detach_thread(
1127 &mut self,
1128 thread_id: ThreadId,
1129 allow_terminated_joined: bool,
1130 ) -> InterpResult<'tcx> {
1131 let this = self.eval_context_mut();
1132 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1133 }
1134
1135 fn join_thread(
1139 &mut self,
1140 joined_thread_id: ThreadId,
1141 success_retval: Scalar,
1142 return_dest: &MPlaceTy<'tcx>,
1143 ) -> InterpResult<'tcx> {
1144 let this = self.eval_context_mut();
1145 let thread_mgr = &mut this.machine.threads;
1146 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1147 throw_ub_format!("trying to join a detached thread");
1149 }
1150
1151 fn after_join<'tcx>(
1152 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1153 joined_thread_id: ThreadId,
1154 success_retval: Scalar,
1155 return_dest: &MPlaceTy<'tcx>,
1156 ) -> InterpResult<'tcx> {
1157 let threads = &this.machine.threads;
1158 match &mut this.machine.data_race {
1159 GlobalDataRaceHandler::None => {}
1160 GlobalDataRaceHandler::Vclocks(data_race) =>
1161 data_race.thread_joined(threads, joined_thread_id),
1162 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1163 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1164 }
1165 this.write_scalar(success_retval, return_dest)?;
1166 interp_ok(())
1167 }
1168
1169 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1172 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1173 trace!(
1174 "{:?} blocked on {:?} when trying to join",
1175 thread_mgr.active_thread, joined_thread_id
1176 );
1177 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1178 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1179 }
1180
1181 let dest = return_dest.clone();
1184 thread_mgr.block_thread(
1185 BlockReason::Join(joined_thread_id),
1186 None,
1187 callback!(
1188 @capture<'tcx> {
1189 joined_thread_id: ThreadId,
1190 dest: MPlaceTy<'tcx>,
1191 success_retval: Scalar,
1192 }
1193 |this, unblock: UnblockKind| {
1194 assert_eq!(unblock, UnblockKind::Ready);
1195 after_join(this, joined_thread_id, success_retval, &dest)
1196 }
1197 ),
1198 );
1199 } else {
1200 after_join(this, joined_thread_id, success_retval, return_dest)?;
1202 }
1203 interp_ok(())
1204 }
1205
1206 fn join_thread_exclusive(
1211 &mut self,
1212 joined_thread_id: ThreadId,
1213 success_retval: Scalar,
1214 return_dest: &MPlaceTy<'tcx>,
1215 ) -> InterpResult<'tcx> {
1216 let this = self.eval_context_mut();
1217 let threads = &this.machine.threads.threads;
1218 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1219 throw_ub_format!("trying to join an already joined thread");
1220 }
1221
1222 if joined_thread_id == this.machine.threads.active_thread {
1223 throw_ub_format!("trying to join itself");
1224 }
1225
1226 assert!(
1228 threads
1229 .iter()
1230 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1231 "this thread already has threads waiting for its termination"
1232 );
1233
1234 this.join_thread(joined_thread_id, success_retval, return_dest)
1235 }
1236
1237 #[inline]
1238 fn active_thread(&self) -> ThreadId {
1239 let this = self.eval_context_ref();
1240 this.machine.threads.active_thread()
1241 }
1242
1243 #[inline]
1244 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1245 let this = self.eval_context_mut();
1246 this.machine.threads.active_thread_mut()
1247 }
1248
1249 #[inline]
1250 fn active_thread_ref(&self) -> &Thread<'tcx> {
1251 let this = self.eval_context_ref();
1252 this.machine.threads.active_thread_ref()
1253 }
1254
1255 #[inline]
1256 fn get_total_thread_count(&self) -> usize {
1257 let this = self.eval_context_ref();
1258 this.machine.threads.get_total_thread_count()
1259 }
1260
1261 #[inline]
1262 fn have_all_terminated(&self) -> bool {
1263 let this = self.eval_context_ref();
1264 this.machine.threads.have_all_terminated()
1265 }
1266
1267 #[inline]
1268 fn enable_thread(&mut self, thread_id: ThreadId) {
1269 let this = self.eval_context_mut();
1270 this.machine.threads.enable_thread(thread_id);
1271 }
1272
1273 #[inline]
1274 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1275 let this = self.eval_context_ref();
1276 this.machine.threads.active_thread_stack()
1277 }
1278
1279 #[inline]
1280 fn active_thread_stack_mut<'a>(
1281 &'a mut self,
1282 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1283 let this = self.eval_context_mut();
1284 this.machine.threads.active_thread_stack_mut()
1285 }
1286
1287 #[inline]
1289 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1290 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1291 }
1292
1293 #[inline]
1294 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1295 where
1296 'tcx: 'c,
1297 {
1298 self.eval_context_ref().machine.threads.get_thread_name(thread)
1299 }
1300
1301 #[inline]
1302 fn yield_active_thread(&mut self) {
1303 self.eval_context_mut().machine.threads.yield_active_thread();
1304 }
1305
1306 #[inline]
1307 fn maybe_preempt_active_thread(&mut self) {
1308 use rand::Rng as _;
1309
1310 let this = self.eval_context_mut();
1311 if !this.machine.threads.fixed_scheduling
1312 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1313 {
1314 this.yield_active_thread();
1315 }
1316 }
1317
1318 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1321 let this = self.eval_context_mut();
1322 loop {
1323 if CTRL_C_RECEIVED.load(Relaxed) {
1324 this.machine.handle_abnormal_termination();
1325 throw_machine_stop!(TerminationInfo::Interrupted);
1326 }
1327 match this.schedule()? {
1328 SchedulingAction::ExecuteStep => {
1329 if !this.step()? {
1330 match this.run_on_stack_empty()? {
1332 Poll::Pending => {} Poll::Ready(()) =>
1334 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1335 }
1336 }
1337 }
1338 SchedulingAction::SleepAndWaitForIo(duration) => {
1339 if this.machine.communicate() {
1340 this.poll_and_unblock(duration)?;
1345 } else {
1346 let duration = duration.expect(
1347 "Infinite sleep should not be triggered when isolation is enabled",
1348 );
1349 this.machine.monotonic_clock.sleep(duration);
1350 }
1351 }
1352 }
1353 }
1354 }
1355}