1use std::sync::atomic::Ordering::Relaxed;
4use std::task::Poll;
5use std::time::{Duration, SystemTime};
6use std::{io, mem};
7
8use rand::RngExt;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::either::Either;
13use rustc_data_structures::fx::FxHashMap;
14use rustc_hir::def_id::DefId;
15use rustc_index::{Idx, IndexVec};
16use rustc_middle::mir::Mutability;
17use rustc_middle::ty::layout::TyAndLayout;
18use rustc_span::{DUMMY_SP, Span};
19use rustc_target::spec::Os;
20
21use crate::concurrency::GlobalDataRaceHandler;
22use crate::shims::{Epoll, EpollEvalContextExt, FileDescriptionRef, tls};
23use crate::*;
24
25#[derive(Clone, Copy, Debug, PartialEq)]
26enum SchedulingAction {
27 ExecuteStep,
29 SleepAndWaitForIo(Option<Duration>),
34}
35
36#[derive(Clone, Copy, Debug, PartialEq)]
38pub enum TlsAllocAction {
39 Deallocate,
41 Leak,
44}
45
46#[derive(Clone, Copy, Debug, PartialEq)]
48pub enum UnblockKind {
49 Ready,
51 TimedOut,
53}
54
55pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
58
59#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
61pub struct ThreadId(u32);
62
63impl ThreadId {
64 pub fn to_u32(self) -> u32 {
65 self.0
66 }
67
68 pub fn new_unchecked(id: u32) -> Self {
70 Self(id)
71 }
72
73 pub const MAIN_THREAD: ThreadId = ThreadId(0);
74}
75
76impl Idx for ThreadId {
77 fn new(idx: usize) -> Self {
78 ThreadId(u32::try_from(idx).unwrap())
79 }
80
81 fn index(self) -> usize {
82 usize::try_from(self.0).unwrap()
83 }
84}
85
86impl From<ThreadId> for u64 {
87 fn from(t: ThreadId) -> Self {
88 t.0.into()
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub enum BlockReason {
95 Join(ThreadId),
98 Sleep,
100 Mutex,
102 Condvar,
104 RwLock,
106 Futex,
108 InitOnce,
110 Epoll { epfd: FileDescriptionRef<Epoll> },
112 Eventfd,
114 VirtualSocket,
116 IO,
118 Genmc,
121}
122
123enum ThreadState<'tcx> {
125 Enabled,
127 Blocked { reason: BlockReason, deadline: Option<Deadline>, callback: DynUnblockCallback<'tcx> },
129 Terminated,
132}
133
134impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Enabled => write!(f, "Enabled"),
138 Self::Blocked { reason, deadline, .. } =>
139 f.debug_struct("Blocked")
140 .field("reason", reason)
141 .field("deadline", deadline)
142 .finish(),
143 Self::Terminated => write!(f, "Terminated"),
144 }
145 }
146}
147
148impl<'tcx> ThreadState<'tcx> {
149 fn is_enabled(&self) -> bool {
150 matches!(self, ThreadState::Enabled)
151 }
152
153 fn is_terminated(&self) -> bool {
154 matches!(self, ThreadState::Terminated)
155 }
156
157 fn is_blocked_on(&self, reason: &BlockReason) -> bool {
158 matches!(self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
159 }
160}
161
162#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164enum ThreadJoinStatus {
165 Joinable,
167 Detached,
170 Joined,
172}
173
174pub struct Thread<'tcx> {
176 state: ThreadState<'tcx>,
177
178 thread_name: Option<Vec<u8>>,
180
181 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
183
184 pub(crate) origin_span: Span,
187
188 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
193
194 top_user_relevant_frame: Option<usize>,
199
200 join_status: ThreadJoinStatus,
202
203 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
212
213 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
215}
216
217pub type StackEmptyCallback<'tcx> =
218 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
219
220impl<'tcx> Thread<'tcx> {
221 fn thread_name(&self) -> Option<&[u8]> {
223 self.thread_name.as_deref()
224 }
225
226 pub fn is_enabled(&self) -> bool {
228 self.state.is_enabled()
229 }
230
231 fn thread_display_name(&self, id: ThreadId) -> String {
233 if let Some(ref thread_name) = self.thread_name {
234 String::from_utf8_lossy(thread_name).into_owned()
235 } else {
236 format!("unnamed-{}", id.index())
237 }
238 }
239
240 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
246 let mut best = None;
248 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
249 let relevance = frame.extra.user_relevance;
250 if relevance == u8::MAX {
251 return Some(idx);
253 }
254 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
255 best = Some((idx, relevance));
258 }
259 }
260 best.map(|(idx, _relevance)| idx)
261 }
262
263 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
266 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
267 }
268
269 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
272 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
273 self.top_user_relevant_frame = Some(frame_idx);
274 }
275
276 pub fn top_user_relevant_frame(&self) -> Option<usize> {
279 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
283 }
284
285 pub fn current_user_relevance(&self) -> u8 {
286 self.top_user_relevant_frame()
287 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
288 .unwrap_or(0)
289 }
290
291 pub fn current_user_relevant_span(&self) -> Span {
292 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
293 self.top_user_relevant_frame()
294 .map(|frame_idx| self.stack[frame_idx].current_span())
295 .unwrap_or(rustc_span::DUMMY_SP)
296 }
297}
298
299impl<'tcx> std::fmt::Debug for Thread<'tcx> {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 write!(
302 f,
303 "{}({:?}, {:?})",
304 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
305 self.state,
306 self.join_status
307 )
308 }
309}
310
311impl<'tcx> Thread<'tcx> {
312 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
313 Self {
314 state: ThreadState::Enabled,
315 thread_name: name.map(|name| Vec::from(name.as_bytes())),
316 stack: Vec::new(),
317 origin_span: DUMMY_SP,
318 top_user_relevant_frame: None,
319 join_status: ThreadJoinStatus::Joinable,
320 unwind_payloads: Vec::new(),
321 last_error: None,
322 on_stack_empty,
323 }
324 }
325}
326
327impl VisitProvenance for Thread<'_> {
328 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
329 let Thread {
330 unwind_payloads: panic_payload,
331 last_error,
332 stack,
333 origin_span: _,
334 top_user_relevant_frame: _,
335 state: _,
336 thread_name: _,
337 join_status: _,
338 on_stack_empty: _, } = self;
340
341 for payload in panic_payload {
342 payload.visit_provenance(visit);
343 }
344 last_error.visit_provenance(visit);
345 for frame in stack {
346 frame.visit_provenance(visit)
347 }
348 }
349}
350
351impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
352 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
353 let return_place = self.return_place();
354 let Frame {
355 locals,
356 extra,
357 ..
359 } = self;
360
361 return_place.visit_provenance(visit);
363 for local in locals.iter() {
365 match local.as_mplace_or_imm() {
366 None => {}
367 Some(Either::Left((ptr, meta))) => {
368 ptr.visit_provenance(visit);
369 meta.visit_provenance(visit);
370 }
371 Some(Either::Right(imm)) => {
372 imm.visit_provenance(visit);
373 }
374 }
375 }
376
377 extra.visit_provenance(visit);
378 }
379}
380
381#[derive(Debug, Copy, Clone)]
383pub enum ThreadLookupError {
384 InvalidId,
386 Terminated(ThreadId),
388}
389
390#[derive(Debug)]
392pub struct ThreadManager<'tcx> {
393 active_thread: ThreadId,
395 threads: IndexVec<ThreadId, Thread<'tcx>>,
399 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
401 yield_active_thread: bool,
404 fixed_scheduling: bool,
406}
407
408impl VisitProvenance for ThreadManager<'_> {
409 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
410 let ThreadManager {
411 threads,
412 thread_local_allocs,
413 active_thread: _,
414 yield_active_thread: _,
415 fixed_scheduling: _,
416 } = self;
417
418 for thread in threads {
419 thread.visit_provenance(visit);
420 }
421 for ptr in thread_local_allocs.values() {
422 ptr.visit_provenance(visit);
423 }
424 }
425}
426
427impl<'tcx> ThreadManager<'tcx> {
428 pub(crate) fn new(config: &MiriConfig) -> Self {
429 let mut threads = IndexVec::new();
430 threads.push(Thread::new(Some("main"), None));
432 Self {
433 active_thread: ThreadId::MAIN_THREAD,
434 threads,
435 thread_local_allocs: Default::default(),
436 yield_active_thread: false,
437 fixed_scheduling: config.fixed_scheduling,
438 }
439 }
440
441 pub(crate) fn init(
442 ecx: &mut MiriInterpCx<'tcx>,
443 on_main_stack_empty: StackEmptyCallback<'tcx>,
444 ) {
445 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
446 Some(on_main_stack_empty);
447 if ecx.tcx.sess.target.os != Os::Windows {
448 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
450 ThreadJoinStatus::Detached;
451 }
452 }
453
454 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
458 if let Ok(id) = id.try_into()
459 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
460 {
461 let thread_id = ThreadId(id);
462 if self.threads[thread_id].state.is_terminated() {
463 Err(ThreadLookupError::Terminated(thread_id))
464 } else {
465 Ok(thread_id)
466 }
467 } else {
468 Err(ThreadLookupError::InvalidId)
469 }
470 }
471
472 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
475 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
476 }
477
478 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
483 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
484 }
485
486 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
488 &self.threads[self.active_thread].stack
489 }
490
491 pub fn active_thread_stack_mut(
493 &mut self,
494 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
495 &mut self.threads[self.active_thread].stack
496 }
497
498 pub fn all_blocked_stacks(
499 &self,
500 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
501 self.threads
502 .iter_enumerated()
503 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
504 .map(|(id, t)| (id, &t.stack[..]))
505 }
506
507 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
509 let new_thread_id = ThreadId::new(self.threads.len());
510 self.threads.push(Thread::new(None, Some(on_stack_empty)));
511 new_thread_id
512 }
513
514 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
516 assert!(id.index() < self.threads.len());
517 info!(
518 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
519 self.get_thread_display_name(id),
520 self.get_thread_display_name(self.active_thread)
521 );
522 std::mem::replace(&mut self.active_thread, id)
523 }
524
525 pub fn active_thread(&self) -> ThreadId {
527 self.active_thread
528 }
529
530 pub fn get_total_thread_count(&self) -> usize {
532 self.threads.len()
533 }
534
535 pub fn get_live_thread_count(&self) -> usize {
538 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
539 }
540
541 fn has_terminated(&self, thread_id: ThreadId) -> bool {
543 self.threads[thread_id].state.is_terminated()
544 }
545
546 fn have_all_terminated(&self) -> bool {
548 self.threads.iter().all(|thread| thread.state.is_terminated())
549 }
550
551 fn enable_thread(&mut self, thread_id: ThreadId) {
553 assert!(self.has_terminated(thread_id));
554 self.threads[thread_id].state = ThreadState::Enabled;
555 }
556
557 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
559 &mut self.threads[self.active_thread]
560 }
561
562 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
564 &self.threads[self.active_thread]
565 }
566
567 pub fn thread_ref(&self, thread_id: ThreadId) -> &Thread<'tcx> {
568 &self.threads[thread_id]
569 }
570
571 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
580 trace!("detaching {:?}", id);
582
583 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
584 self.threads[id].join_status == ThreadJoinStatus::Detached
586 } else {
587 self.threads[id].join_status != ThreadJoinStatus::Joinable
588 };
589 if is_ub {
590 throw_ub_format!("trying to detach thread that was already detached or joined");
591 }
592
593 self.threads[id].join_status = ThreadJoinStatus::Detached;
594 interp_ok(())
595 }
596
597 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
599 self.threads[thread].thread_name = Some(new_thread_name);
600 }
601
602 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
604 self.threads[thread].thread_name()
605 }
606
607 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
608 self.threads[thread].thread_display_name(thread)
609 }
610
611 fn block_thread(
613 &mut self,
614 reason: BlockReason,
615 deadline: Option<Deadline>,
616 callback: DynUnblockCallback<'tcx>,
617 ) {
618 let state = &mut self.threads[self.active_thread].state;
619 assert!(state.is_enabled());
620 *state = ThreadState::Blocked { reason, deadline, callback }
621 }
622
623 fn yield_active_thread(&mut self) {
625 self.yield_active_thread = true;
629 }
630}
631
632impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
633trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
634 #[inline]
635 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
636 let this = self.eval_context_mut();
637 let active_thread = this.active_thread_mut();
638 active_thread.origin_span = DUMMY_SP; let mut callback = active_thread
640 .on_stack_empty
641 .take()
642 .expect("`on_stack_empty` not set up, or already running");
643 let res = callback(this)?;
644 this.active_thread_mut().on_stack_empty = Some(callback);
645 interp_ok(res)
646 }
647
648 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
657 let this = self.eval_context_mut();
658
659 if this.machine.data_race.as_genmc_ref().is_some() {
661 loop {
662 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
663 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
664 return interp_ok(SchedulingAction::ExecuteStep);
665 };
666 if this.machine.threads.threads[next_thread_id]
668 .state
669 .is_blocked_on(&BlockReason::Genmc)
670 {
671 info!(
672 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
673 );
674 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
675 }
676 let thread_manager = &mut this.machine.threads;
679 if thread_manager.threads[next_thread_id].state.is_enabled() {
680 thread_manager.active_thread = next_thread_id;
682 return interp_ok(SchedulingAction::ExecuteStep);
683 }
684 }
685 }
686
687 let thread_manager = &this.machine.threads;
689 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
691 && !thread_manager.yield_active_thread
692 {
693 return interp_ok(SchedulingAction::ExecuteStep);
695 }
696
697 if this.machine.communicate() {
701 this.poll_and_unblock(Some(Duration::ZERO))?;
708 }
709
710 let potential_sleep_time = this.unblock_expired_deadlines()?;
716
717 let thread_manager = &mut this.machine.threads;
718 let rng = this.machine.rng.get_mut();
719
720 let mut threads_iter = thread_manager
727 .threads
728 .iter_enumerated()
729 .skip(thread_manager.active_thread.index() + 1)
730 .chain(
731 thread_manager
732 .threads
733 .iter_enumerated()
734 .take(thread_manager.active_thread.index() + 1),
735 )
736 .filter(|(_id, thread)| thread.state.is_enabled());
737 let new_thread = if thread_manager.fixed_scheduling {
739 let next = threads_iter.next();
740 drop(threads_iter);
741 next
742 } else {
743 threads_iter.choose(rng)
744 };
745
746 if let Some((id, _thread)) = new_thread {
747 if thread_manager.active_thread != id {
748 info!(
749 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
750 thread_manager.get_thread_display_name(id),
751 thread_manager.get_thread_display_name(thread_manager.active_thread)
752 );
753 thread_manager.active_thread = id;
754 }
755 }
756 thread_manager.yield_active_thread = false;
758
759 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
760 return interp_ok(SchedulingAction::ExecuteStep);
761 }
762
763 let threads = &this.machine.threads.threads;
765
766 if threads.iter().all(|thread| thread.state.is_terminated()) {
767 unreachable!("all threads terminated without the main thread terminating?!");
768 } else if let Some(sleep_time) = potential_sleep_time {
769 interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
773 } else if threads.iter().any(|thread| this.is_thread_blocked_on_host(thread)) {
774 interp_ok(SchedulingAction::SleepAndWaitForIo(None))
778 } else {
779 throw_machine_stop!(TerminationInfo::GlobalDeadlock);
780 }
781 }
782
783 fn is_thread_blocked_on_host(&self, thread: &Thread<'tcx>) -> bool {
787 let this = self.eval_context_ref();
788 match &thread.state {
789 ThreadState::Blocked { reason: BlockReason::IO, .. } => true,
790 ThreadState::Blocked { reason: BlockReason::Epoll { epfd }, .. } =>
791 this.has_epoll_host_interests(epfd),
792 _ => false,
793 }
794 }
795
796 fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
802 let this = self.eval_context_mut();
803
804 match BlockingIoManager::poll(this, timeout)? {
805 Ok(_) => interp_ok(()),
806 Err(e) if e.kind() == io::ErrorKind::Interrupted => interp_ok(()),
808 Err(e) => panic!("unexpected error while polling: {e}"),
811 }
812 }
813
814 fn unblock_expired_deadlines(&mut self) -> InterpResult<'tcx, Option<Duration>> {
819 let this = self.eval_context_mut();
820 let communicate = this.machine.communicate();
821
822 let mut min_wait_time = Option::<Duration>::None;
823 let mut callbacks = Vec::new();
824
825 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
826 match &thread.state {
827 ThreadState::Blocked { deadline: Some(deadline), .. } => {
828 let wait_time = match deadline {
829 Deadline::Monotonic(instant) =>
830 instant.duration_since(this.machine.monotonic_clock.now()),
831 Deadline::RealTime(time) => {
832 assert!(communicate, "cannot have `RealTime` timeout with isolation");
833 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)
834 }
835 };
836
837 if wait_time.is_zero() {
838 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
840 let ThreadState::Blocked { callback, .. } = old_state else {
841 unreachable!()
842 };
843 callbacks.push((id, callback));
845 } else {
846 min_wait_time = Some(wait_time.min(min_wait_time.unwrap_or(Duration::MAX)));
849 }
850 }
851 _ => {}
852 }
853 }
854
855 for (thread, callback) in callbacks {
856 let old_thread = this.machine.threads.set_active_thread_id(thread);
863 callback.call(this, UnblockKind::TimedOut)?;
864 this.machine.threads.set_active_thread_id(old_thread);
865 }
866
867 interp_ok(min_wait_time)
868 }
869}
870
871impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
873pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
874 #[inline]
875 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
876 self.eval_context_ref().machine.threads.thread_id_try_from(id)
877 }
878
879 fn get_or_create_thread_local_alloc(
882 &mut self,
883 def_id: DefId,
884 ) -> InterpResult<'tcx, StrictPointer> {
885 let this = self.eval_context_mut();
886 let tcx = this.tcx;
887 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
888 interp_ok(old_alloc)
891 } else {
892 if tcx.is_foreign_item(def_id) {
896 throw_unsup_format!("foreign thread-local statics are not supported");
897 }
898 let params = this.machine.get_default_alloc_params();
899 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
900 let mut alloc = alloc.inner().adjust_from_tcx(
902 &this.tcx,
903 |bytes, align| {
904 interp_ok(MiriAllocBytes::from_bytes(
905 std::borrow::Cow::Borrowed(bytes),
906 align,
907 params,
908 ))
909 },
910 |ptr| this.global_root_pointer(ptr),
911 )?;
912 alloc.mutability = Mutability::Mut;
914 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
916 this.machine.threads.set_thread_local_alloc(def_id, ptr);
917 interp_ok(ptr)
918 }
919 }
920
921 #[inline]
923 fn start_regular_thread(
924 &mut self,
925 thread: Option<MPlaceTy<'tcx>>,
926 start_routine: Pointer,
927 start_abi: ExternAbi,
928 func_arg: ImmTy<'tcx>,
929 ret_layout: TyAndLayout<'tcx>,
930 ) -> InterpResult<'tcx, ThreadId> {
931 let this = self.eval_context_mut();
932
933 let current_span = this.machine.current_user_relevant_span();
935 let new_thread_id = this.machine.threads.create_thread({
936 let mut state = tls::TlsDtorsState::default();
937 Box::new(move |m| state.on_stack_empty(m))
938 });
939 match &mut this.machine.data_race {
940 GlobalDataRaceHandler::None => {}
941 GlobalDataRaceHandler::Vclocks(data_race) =>
942 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
943 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
944 genmc_ctx.handle_thread_create(
945 &this.machine.threads,
946 start_routine,
947 &func_arg,
948 new_thread_id,
949 )?,
950 }
951 if let Some(thread_info_place) = thread {
954 this.write_scalar(
955 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
956 &thread_info_place,
957 )?;
958 }
959
960 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
963
964 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
966 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
967 }
968
969 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
971
972 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
976
977 this.call_thread_root_function(
978 instance,
979 start_abi,
980 &[func_arg],
981 Some(&ret_place),
982 current_span,
983 )?;
984
985 this.machine.threads.set_active_thread_id(old_thread_id);
987
988 interp_ok(new_thread_id)
989 }
990
991 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
996 let this = self.eval_context_mut();
997
998 let thread = this.active_thread_mut();
1000 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1001 thread.state = ThreadState::Terminated;
1002
1003 let gone_thread = this.active_thread();
1005 {
1006 let mut free_tls_statics = Vec::new();
1007 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
1008 if thread != gone_thread {
1009 return true;
1011 }
1012 free_tls_statics.push(alloc_id);
1015 false
1016 });
1017 for ptr in free_tls_statics {
1019 match tls_alloc_action {
1020 TlsAllocAction::Deallocate =>
1021 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
1022 TlsAllocAction::Leak =>
1023 if let Some(alloc) = ptr.provenance.get_alloc_id() {
1024 trace!(
1025 "Thread-local static leaked and stored as static root: {:?}",
1026 alloc
1027 );
1028 this.machine.static_roots.push(alloc);
1029 },
1030 }
1031 }
1032 }
1033
1034 match &mut this.machine.data_race {
1035 GlobalDataRaceHandler::None => {}
1036 GlobalDataRaceHandler::Vclocks(data_race) =>
1037 data_race.thread_terminated(&this.machine.threads),
1038 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
1039 genmc_ctx.handle_thread_finish(&this.machine.threads)
1042 }
1043 }
1044
1045 let unblock_reason = BlockReason::Join(gone_thread);
1047 let threads = &this.machine.threads.threads;
1048 let joining_threads = threads
1049 .iter_enumerated()
1050 .filter(|(_, thread)| thread.state.is_blocked_on(&unblock_reason))
1051 .map(|(id, _)| id)
1052 .collect::<Vec<_>>();
1053 for thread in joining_threads {
1054 this.unblock_thread(thread, unblock_reason.clone())?;
1055 }
1056
1057 interp_ok(())
1058 }
1059
1060 #[inline]
1063 fn block_thread(
1064 &mut self,
1065 reason: BlockReason,
1066 deadline: Option<Deadline>,
1067 callback: DynUnblockCallback<'tcx>,
1068 ) {
1069 let this = self.eval_context_mut();
1070 if deadline.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1071 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1072 }
1073 if matches!(deadline, Some(Deadline::RealTime(_))) && !this.machine.communicate() {
1074 panic!("cannot have `RealTime` timeout with isolation");
1075 }
1076 this.machine.threads.block_thread(reason, deadline, callback);
1077 }
1078
1079 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1082 let this = self.eval_context_mut();
1083 let old_state =
1084 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1085 let callback = match old_state {
1086 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1087 assert_eq!(
1088 reason, actual_reason,
1089 "unblock_thread: thread was blocked for the wrong reason"
1090 );
1091 callback
1092 }
1093 _ => panic!("unblock_thread: thread was not blocked"),
1094 };
1095 let old_thread = this.machine.threads.set_active_thread_id(thread);
1097 callback.call(this, UnblockKind::Ready)?;
1098 this.machine.threads.set_active_thread_id(old_thread);
1099 interp_ok(())
1100 }
1101
1102 #[inline]
1103 fn detach_thread(
1104 &mut self,
1105 thread_id: ThreadId,
1106 allow_terminated_joined: bool,
1107 ) -> InterpResult<'tcx> {
1108 let this = self.eval_context_mut();
1109 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1110 }
1111
1112 fn join_thread(
1116 &mut self,
1117 joined_thread_id: ThreadId,
1118 success_retval: Scalar,
1119 return_dest: &MPlaceTy<'tcx>,
1120 ) -> InterpResult<'tcx> {
1121 let this = self.eval_context_mut();
1122 let thread_mgr = &mut this.machine.threads;
1123 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1124 throw_ub_format!("trying to join a detached thread");
1126 }
1127
1128 fn after_join<'tcx>(
1129 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1130 joined_thread_id: ThreadId,
1131 success_retval: Scalar,
1132 return_dest: &MPlaceTy<'tcx>,
1133 ) -> InterpResult<'tcx> {
1134 let threads = &this.machine.threads;
1135 match &mut this.machine.data_race {
1136 GlobalDataRaceHandler::None => {}
1137 GlobalDataRaceHandler::Vclocks(data_race) =>
1138 data_race.thread_joined(threads, joined_thread_id),
1139 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1140 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1141 }
1142 this.write_scalar(success_retval, return_dest)?;
1143 interp_ok(())
1144 }
1145
1146 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1149 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1150 trace!(
1151 "{:?} blocked on {:?} when trying to join",
1152 thread_mgr.active_thread, joined_thread_id
1153 );
1154 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1155 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1156 }
1157
1158 let dest = return_dest.clone();
1161 thread_mgr.block_thread(
1162 BlockReason::Join(joined_thread_id),
1163 None,
1164 callback!(
1165 @capture<'tcx> {
1166 joined_thread_id: ThreadId,
1167 dest: MPlaceTy<'tcx>,
1168 success_retval: Scalar,
1169 }
1170 |this, unblock: UnblockKind| {
1171 assert_eq!(unblock, UnblockKind::Ready);
1172 after_join(this, joined_thread_id, success_retval, &dest)
1173 }
1174 ),
1175 );
1176 } else {
1177 after_join(this, joined_thread_id, success_retval, return_dest)?;
1179 }
1180 interp_ok(())
1181 }
1182
1183 fn join_thread_exclusive(
1188 &mut self,
1189 joined_thread_id: ThreadId,
1190 success_retval: Scalar,
1191 return_dest: &MPlaceTy<'tcx>,
1192 ) -> InterpResult<'tcx> {
1193 let this = self.eval_context_mut();
1194 let threads = &this.machine.threads.threads;
1195 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1196 throw_ub_format!("trying to join an already joined thread");
1197 }
1198
1199 if joined_thread_id == this.machine.threads.active_thread {
1200 throw_ub_format!("trying to join itself");
1201 }
1202
1203 assert!(
1205 threads.iter().all(|thread| {
1206 !thread.state.is_blocked_on(&BlockReason::Join(joined_thread_id))
1207 }),
1208 "this thread already has threads waiting for its termination"
1209 );
1210
1211 this.join_thread(joined_thread_id, success_retval, return_dest)
1212 }
1213
1214 #[inline]
1215 fn active_thread(&self) -> ThreadId {
1216 let this = self.eval_context_ref();
1217 this.machine.threads.active_thread()
1218 }
1219
1220 #[inline]
1221 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1222 let this = self.eval_context_mut();
1223 this.machine.threads.active_thread_mut()
1224 }
1225
1226 #[inline]
1227 fn active_thread_ref(&self) -> &Thread<'tcx> {
1228 let this = self.eval_context_ref();
1229 this.machine.threads.active_thread_ref()
1230 }
1231
1232 #[inline]
1233 fn get_total_thread_count(&self) -> usize {
1234 let this = self.eval_context_ref();
1235 this.machine.threads.get_total_thread_count()
1236 }
1237
1238 #[inline]
1239 fn have_all_terminated(&self) -> bool {
1240 let this = self.eval_context_ref();
1241 this.machine.threads.have_all_terminated()
1242 }
1243
1244 #[inline]
1245 fn enable_thread(&mut self, thread_id: ThreadId) {
1246 let this = self.eval_context_mut();
1247 this.machine.threads.enable_thread(thread_id);
1248 }
1249
1250 #[inline]
1251 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1252 let this = self.eval_context_ref();
1253 this.machine.threads.active_thread_stack()
1254 }
1255
1256 #[inline]
1257 fn active_thread_stack_mut<'a>(
1258 &'a mut self,
1259 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1260 let this = self.eval_context_mut();
1261 this.machine.threads.active_thread_stack_mut()
1262 }
1263
1264 #[inline]
1266 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1267 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1268 }
1269
1270 #[inline]
1271 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1272 where
1273 'tcx: 'c,
1274 {
1275 self.eval_context_ref().machine.threads.get_thread_name(thread)
1276 }
1277
1278 #[inline]
1279 fn yield_active_thread(&mut self) {
1280 self.eval_context_mut().machine.threads.yield_active_thread();
1281 }
1282
1283 #[inline]
1284 fn maybe_preempt_active_thread(&mut self) {
1285 let this = self.eval_context_mut();
1286 if !this.machine.threads.fixed_scheduling
1287 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1288 {
1289 this.yield_active_thread();
1290 }
1291 }
1292
1293 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1296 let this = self.eval_context_mut();
1297 loop {
1298 if CTRL_C_RECEIVED.load(Relaxed) {
1299 this.machine.handle_abnormal_termination();
1300 throw_machine_stop!(TerminationInfo::Interrupted);
1301 }
1302 match this.schedule()? {
1303 SchedulingAction::ExecuteStep => {
1304 if !this.step()? {
1305 match this.run_on_stack_empty()? {
1307 Poll::Pending => {} Poll::Ready(()) =>
1309 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1310 }
1311 }
1312 }
1313 SchedulingAction::SleepAndWaitForIo(duration) => {
1314 if this.machine.communicate() {
1315 this.poll_and_unblock(duration)?;
1320 } else {
1321 let duration = duration.expect(
1322 "Infinite sleep should not be triggered when isolation is enabled",
1323 );
1324 this.machine.monotonic_clock.sleep(duration);
1325 }
1326 }
1327 }
1328 }
1329 }
1330}