1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use rand::RngExt;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::either::Either;
13use rustc_data_structures::fx::FxHashMap;
14use rustc_hir::def_id::DefId;
15use rustc_index::{Idx, IndexVec};
16use rustc_middle::mir::Mutability;
17use rustc_middle::ty::layout::TyAndLayout;
18use rustc_span::{DUMMY_SP, Span};
19use rustc_target::spec::Os;
20
21use crate::concurrency::GlobalDataRaceHandler;
22use crate::shims::{Epoll, EpollEvalContextExt, FileDescriptionRef, tls};
23use crate::*;
24
25#[derive(Clone, Copy, Debug, PartialEq)]
26enum SchedulingAction {
27 ExecuteStep,
29 SleepAndWaitForIo(Option<Duration>),
34}
35
36#[derive(Clone, Copy, Debug, PartialEq)]
38pub enum TlsAllocAction {
39 Deallocate,
41 Leak,
44}
45
46#[derive(Clone, Copy, Debug, PartialEq)]
48pub enum UnblockKind {
49 Ready,
51 TimedOut,
53}
54
55pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
58
59#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
61pub struct ThreadId(u32);
62
63impl ThreadId {
64 pub fn to_u32(self) -> u32 {
65 self.0
66 }
67
68 pub fn new_unchecked(id: u32) -> Self {
70 Self(id)
71 }
72
73 pub const MAIN_THREAD: ThreadId = ThreadId(0);
74}
75
76impl Idx for ThreadId {
77 fn new(idx: usize) -> Self {
78 ThreadId(u32::try_from(idx).unwrap())
79 }
80
81 fn index(self) -> usize {
82 usize::try_from(self.0).unwrap()
83 }
84}
85
86impl From<ThreadId> for u64 {
87 fn from(t: ThreadId) -> Self {
88 t.0.into()
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub enum BlockReason {
95 Join(ThreadId),
98 Sleep,
100 Mutex,
102 Condvar,
104 RwLock,
106 Futex,
108 InitOnce,
110 Epoll { epfd: FileDescriptionRef<Epoll> },
112 Eventfd,
114 VirtualSocket,
116 IO,
118 Genmc,
121}
122
123enum ThreadState<'tcx> {
125 Enabled,
127 Blocked { reason: BlockReason, deadline: Option<Deadline>, callback: DynUnblockCallback<'tcx> },
129 Terminated,
132}
133
134impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Enabled => write!(f, "Enabled"),
138 Self::Blocked { reason, deadline, .. } =>
139 f.debug_struct("Blocked")
140 .field("reason", reason)
141 .field("deadline", deadline)
142 .finish(),
143 Self::Terminated => write!(f, "Terminated"),
144 }
145 }
146}
147
148impl<'tcx> ThreadState<'tcx> {
149 fn is_enabled(&self) -> bool {
150 matches!(self, ThreadState::Enabled)
151 }
152
153 fn is_terminated(&self) -> bool {
154 matches!(self, ThreadState::Terminated)
155 }
156
157 fn is_blocked_on(&self, reason: &BlockReason) -> bool {
158 matches!(self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
159 }
160}
161
162#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164enum ThreadJoinStatus {
165 Joinable,
167 Detached,
170 Joined,
172}
173
174pub struct Thread<'tcx> {
176 state: ThreadState<'tcx>,
177
178 thread_name: Option<Vec<u8>>,
180
181 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
183
184 pub(crate) origin_span: Span,
187
188 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
193
194 top_user_relevant_frame: Option<usize>,
199
200 join_status: ThreadJoinStatus,
202
203 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
212
213 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
215}
216
217pub type StackEmptyCallback<'tcx> =
218 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
219
220impl<'tcx> Thread<'tcx> {
221 fn thread_name(&self) -> Option<&[u8]> {
223 self.thread_name.as_deref()
224 }
225
226 pub fn is_enabled(&self) -> bool {
228 self.state.is_enabled()
229 }
230
231 fn thread_display_name(&self, id: ThreadId) -> String {
233 if let Some(ref thread_name) = self.thread_name {
234 String::from_utf8_lossy(thread_name).into_owned()
235 } else {
236 format!("unnamed-{}", id.index())
237 }
238 }
239
240 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
246 let mut best = None;
248 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
249 let relevance = frame.extra.user_relevance;
250 if relevance == u8::MAX {
251 return Some(idx);
253 }
254 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
255 best = Some((idx, relevance));
258 }
259 }
260 best.map(|(idx, _relevance)| idx)
261 }
262
263 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
266 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
267 }
268
269 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
272 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
273 self.top_user_relevant_frame = Some(frame_idx);
274 }
275
276 pub fn top_user_relevant_frame(&self) -> Option<usize> {
279 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
283 }
284
285 pub fn current_user_relevance(&self) -> u8 {
286 self.top_user_relevant_frame()
287 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
288 .unwrap_or(0)
289 }
290
291 pub fn current_user_relevant_span(&self) -> Span {
292 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
293 self.top_user_relevant_frame()
294 .map(|frame_idx| self.stack[frame_idx].current_span())
295 .unwrap_or(rustc_span::DUMMY_SP)
296 }
297}
298
299impl<'tcx> std::fmt::Debug for Thread<'tcx> {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 write!(
302 f,
303 "{}({:?}, {:?})",
304 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
305 self.state,
306 self.join_status
307 )
308 }
309}
310
311impl<'tcx> Thread<'tcx> {
312 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
313 Self {
314 state: ThreadState::Enabled,
315 thread_name: name.map(|name| Vec::from(name.as_bytes())),
316 stack: Vec::new(),
317 origin_span: DUMMY_SP,
318 top_user_relevant_frame: None,
319 join_status: ThreadJoinStatus::Joinable,
320 unwind_payloads: Vec::new(),
321 last_error: None,
322 on_stack_empty,
323 }
324 }
325}
326
327impl VisitProvenance for Thread<'_> {
328 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
329 let Thread {
330 unwind_payloads: panic_payload,
331 last_error,
332 stack,
333 origin_span: _,
334 top_user_relevant_frame: _,
335 state: _,
336 thread_name: _,
337 join_status: _,
338 on_stack_empty: _, } = self;
340
341 for payload in panic_payload {
342 payload.visit_provenance(visit);
343 }
344 last_error.visit_provenance(visit);
345 for frame in stack {
346 frame.visit_provenance(visit)
347 }
348 }
349}
350
351impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
352 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
353 let return_place = self.return_place();
354 let Frame {
355 locals,
356 extra,
357 ..
359 } = self;
360
361 return_place.visit_provenance(visit);
363 for local in locals.iter() {
365 match local.as_mplace_or_imm() {
366 None => {}
367 Some(Either::Left((ptr, meta))) => {
368 ptr.visit_provenance(visit);
369 meta.visit_provenance(visit);
370 }
371 Some(Either::Right(imm)) => {
372 imm.visit_provenance(visit);
373 }
374 }
375 }
376
377 extra.visit_provenance(visit);
378 }
379}
380
381#[derive(Debug, Copy, Clone)]
383pub enum ThreadLookupError {
384 InvalidId,
386 Terminated(ThreadId),
388}
389
390#[derive(Debug)]
392pub struct ThreadManager<'tcx> {
393 active_thread: ThreadId,
395 threads: IndexVec<ThreadId, Thread<'tcx>>,
399 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
401 yield_active_thread: bool,
404 fixed_scheduling: bool,
406}
407
408impl VisitProvenance for ThreadManager<'_> {
409 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
410 let ThreadManager {
411 threads,
412 thread_local_allocs,
413 active_thread: _,
414 yield_active_thread: _,
415 fixed_scheduling: _,
416 } = self;
417
418 for thread in threads {
419 thread.visit_provenance(visit);
420 }
421 for ptr in thread_local_allocs.values() {
422 ptr.visit_provenance(visit);
423 }
424 }
425}
426
427impl<'tcx> ThreadManager<'tcx> {
428 pub(crate) fn new(config: &MiriConfig) -> Self {
429 let mut threads = IndexVec::new();
430 threads.push(Thread::new(Some("main"), None));
432 Self {
433 active_thread: ThreadId::MAIN_THREAD,
434 threads,
435 thread_local_allocs: Default::default(),
436 yield_active_thread: false,
437 fixed_scheduling: config.fixed_scheduling,
438 }
439 }
440
441 pub(crate) fn init(
442 ecx: &mut MiriInterpCx<'tcx>,
443 on_main_stack_empty: StackEmptyCallback<'tcx>,
444 ) {
445 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
446 Some(on_main_stack_empty);
447 if ecx.tcx.sess.target.os != Os::Windows {
448 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
450 ThreadJoinStatus::Detached;
451 }
452 }
453
454 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
458 if let Ok(id) = id.try_into()
459 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
460 {
461 let thread_id = ThreadId(id);
462 if self.threads[thread_id].state.is_terminated() {
463 Err(ThreadLookupError::Terminated(thread_id))
464 } else {
465 Ok(thread_id)
466 }
467 } else {
468 Err(ThreadLookupError::InvalidId)
469 }
470 }
471
472 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
475 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
476 }
477
478 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
483 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
484 }
485
486 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
488 &self.threads[self.active_thread].stack
489 }
490
491 pub fn active_thread_stack_mut(
493 &mut self,
494 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
495 &mut self.threads[self.active_thread].stack
496 }
497
498 pub fn all_blocked_stacks(
499 &self,
500 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
501 self.threads
502 .iter_enumerated()
503 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
504 .map(|(id, t)| (id, &t.stack[..]))
505 }
506
507 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
509 let new_thread_id = ThreadId::new(self.threads.len());
510 self.threads.push(Thread::new(None, Some(on_stack_empty)));
511 new_thread_id
512 }
513
514 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
516 assert!(id.index() < self.threads.len());
517 info!(
518 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
519 self.get_thread_display_name(id),
520 self.get_thread_display_name(self.active_thread)
521 );
522 std::mem::replace(&mut self.active_thread, id)
523 }
524
525 pub fn active_thread(&self) -> ThreadId {
527 self.active_thread
528 }
529
530 pub fn get_total_thread_count(&self) -> usize {
532 self.threads.len()
533 }
534
535 pub fn get_live_thread_count(&self) -> usize {
538 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
539 }
540
541 fn has_terminated(&self, thread_id: ThreadId) -> bool {
543 self.threads[thread_id].state.is_terminated()
544 }
545
546 fn have_all_terminated(&self) -> bool {
548 self.threads.iter().all(|thread| thread.state.is_terminated())
549 }
550
551 fn enable_thread(&mut self, thread_id: ThreadId) {
553 assert!(self.has_terminated(thread_id));
554 self.threads[thread_id].state = ThreadState::Enabled;
555 }
556
557 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
559 &mut self.threads[self.active_thread]
560 }
561
562 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
564 &self.threads[self.active_thread]
565 }
566
567 pub fn thread_ref(&self, thread_id: ThreadId) -> &Thread<'tcx> {
568 &self.threads[thread_id]
569 }
570
571 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
580 trace!("detaching {:?}", id);
582
583 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
584 self.threads[id].join_status == ThreadJoinStatus::Detached
586 } else {
587 self.threads[id].join_status != ThreadJoinStatus::Joinable
588 };
589 if is_ub {
590 throw_ub_format!("trying to detach thread that was already detached or joined");
591 }
592
593 self.threads[id].join_status = ThreadJoinStatus::Detached;
594 interp_ok(())
595 }
596
597 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
599 self.threads[thread].thread_name = Some(new_thread_name);
600 }
601
602 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
604 self.threads[thread].thread_name()
605 }
606
607 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
608 self.threads[thread].thread_display_name(thread)
609 }
610
611 fn block_thread(
613 &mut self,
614 reason: BlockReason,
615 deadline: Option<Deadline>,
616 callback: DynUnblockCallback<'tcx>,
617 ) {
618 let state = &mut self.threads[self.active_thread].state;
619 assert!(state.is_enabled());
620 *state = ThreadState::Blocked { reason, deadline, callback }
621 }
622
623 fn yield_active_thread(&mut self) {
625 self.yield_active_thread = true;
629 }
630}
631
632impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
633trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
634 #[inline]
635 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
636 let this = self.eval_context_mut();
637 let active_thread = this.active_thread_mut();
638 active_thread.origin_span = DUMMY_SP; let mut callback = active_thread
640 .on_stack_empty
641 .take()
642 .expect("`on_stack_empty` not set up, or already running");
643 let res = callback(this)?;
644 this.active_thread_mut().on_stack_empty = Some(callback);
645 interp_ok(res)
646 }
647
648 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
657 let this = self.eval_context_mut();
658
659 if this.machine.data_race.as_genmc_ref().is_some() {
661 loop {
662 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
663 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
664 return interp_ok(SchedulingAction::ExecuteStep);
665 };
666 if this.machine.threads.threads[next_thread_id]
668 .state
669 .is_blocked_on(&BlockReason::Genmc)
670 {
671 info!(
672 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
673 );
674 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
675 }
676 let thread_manager = &mut this.machine.threads;
679 if thread_manager.threads[next_thread_id].state.is_enabled() {
680 thread_manager.active_thread = next_thread_id;
682 return interp_ok(SchedulingAction::ExecuteStep);
683 }
684 }
685 }
686
687 let thread_manager = &this.machine.threads;
689 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
691 && !thread_manager.yield_active_thread
692 {
693 return interp_ok(SchedulingAction::ExecuteStep);
695 }
696
697 if this.machine.communicate() {
701 this.poll_and_unblock(Some(Duration::ZERO))?;
711 }
712
713 let potential_sleep_time = this.unblock_expired_deadlines()?;
719
720 let thread_manager = &mut this.machine.threads;
721 let rng = this.machine.rng.get_mut();
722
723 let mut threads_iter = thread_manager
730 .threads
731 .iter_enumerated()
732 .skip(thread_manager.active_thread.index() + 1)
733 .chain(
734 thread_manager
735 .threads
736 .iter_enumerated()
737 .take(thread_manager.active_thread.index() + 1),
738 )
739 .filter(|(_id, thread)| thread.state.is_enabled());
740 let new_thread = if thread_manager.fixed_scheduling {
742 let next = threads_iter.next();
743 drop(threads_iter);
744 next
745 } else {
746 threads_iter.choose(rng)
747 };
748
749 if let Some((id, _thread)) = new_thread {
750 if thread_manager.active_thread != id {
751 info!(
752 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
753 thread_manager.get_thread_display_name(id),
754 thread_manager.get_thread_display_name(thread_manager.active_thread)
755 );
756 thread_manager.active_thread = id;
757 }
758 }
759 thread_manager.yield_active_thread = false;
761
762 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
763 return interp_ok(SchedulingAction::ExecuteStep);
764 }
765
766 let threads = &this.machine.threads.threads;
768
769 if threads.iter().all(|thread| thread.state.is_terminated()) {
770 unreachable!("all threads terminated without the main thread terminating?!");
771 } else if let Some(sleep_time) = potential_sleep_time {
772 interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
776 } else if threads.iter().any(|thread| this.is_thread_blocked_on_host(thread)) {
777 interp_ok(SchedulingAction::SleepAndWaitForIo(None))
781 } else {
782 throw_machine_stop!(TerminationInfo::GlobalDeadlock);
783 }
784 }
785
786 fn is_thread_blocked_on_host(&self, thread: &Thread<'tcx>) -> bool {
790 let this = self.eval_context_ref();
791 match &thread.state {
792 ThreadState::Blocked { reason: BlockReason::IO, .. } => true,
793 ThreadState::Blocked { reason: BlockReason::Epoll { epfd }, .. } =>
794 this.has_epoll_host_interests(epfd),
795 _ => false,
796 }
797 }
798
799 fn unblock_expired_deadlines(&mut self) -> InterpResult<'tcx, Option<Duration>> {
804 let this = self.eval_context_mut();
805 let communicate = this.machine.communicate();
806
807 let mut min_wait_time = Option::<Duration>::None;
808 let mut callbacks = Vec::new();
809
810 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
811 match &thread.state {
812 ThreadState::Blocked { deadline: Some(deadline), .. } => {
813 let wait_time = match deadline {
814 Deadline::Monotonic(instant) =>
815 instant.duration_since(this.machine.monotonic_clock.now()),
816 Deadline::RealTime(time) => {
817 assert!(communicate, "cannot have `RealTime` timeout with isolation");
818 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)
819 }
820 };
821
822 if wait_time.is_zero() {
823 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
825 let ThreadState::Blocked { callback, .. } = old_state else {
826 unreachable!()
827 };
828 callbacks.push((id, callback));
830 } else {
831 min_wait_time = Some(wait_time.min(min_wait_time.unwrap_or(Duration::MAX)));
834 }
835 }
836 _ => {}
837 }
838 }
839
840 for (thread, callback) in callbacks {
841 let old_thread = this.machine.threads.set_active_thread_id(thread);
848 callback.call(this, UnblockKind::TimedOut)?;
849 this.machine.threads.set_active_thread_id(old_thread);
850 }
851
852 interp_ok(min_wait_time)
853 }
854}
855
856impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
858pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
859 fn miri_step(&mut self) -> InterpResult<'tcx> {
861 let this = self.eval_context_mut();
862
863 if !this.step()? {
864 match this.run_on_stack_empty()? {
866 Poll::Pending => {} Poll::Ready(()) => {
868 this.terminate_active_thread(TlsAllocAction::Deallocate)?;
869 }
870 }
871 }
872
873 interp_ok(())
874 }
875
876 #[inline]
877 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
878 self.eval_context_ref().machine.threads.thread_id_try_from(id)
879 }
880
881 fn get_or_create_thread_local_alloc(
884 &mut self,
885 def_id: DefId,
886 ) -> InterpResult<'tcx, StrictPointer> {
887 let this = self.eval_context_mut();
888 let tcx = this.tcx;
889 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
890 interp_ok(old_alloc)
893 } else {
894 if tcx.is_foreign_item(def_id) {
898 throw_unsup_format!("foreign thread-local statics are not supported");
899 }
900 let params = this.machine.get_default_alloc_params();
901 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
902 let mut alloc = alloc.inner().adjust_from_tcx(
904 &this.tcx,
905 |bytes, align| {
906 interp_ok(MiriAllocBytes::from_bytes(
907 std::borrow::Cow::Borrowed(bytes),
908 align,
909 params,
910 ))
911 },
912 |ptr| this.global_root_pointer(ptr),
913 )?;
914 alloc.mutability = Mutability::Mut;
916 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
918 this.machine.threads.set_thread_local_alloc(def_id, ptr);
919 interp_ok(ptr)
920 }
921 }
922
923 #[inline]
925 fn start_regular_thread(
926 &mut self,
927 thread: Option<MPlaceTy<'tcx>>,
928 start_routine: Pointer,
929 start_abi: ExternAbi,
930 func_arg: ImmTy<'tcx>,
931 ret_layout: TyAndLayout<'tcx>,
932 ) -> InterpResult<'tcx, ThreadId> {
933 let this = self.eval_context_mut();
934
935 let current_span = this.machine.current_user_relevant_span();
937 let new_thread_id = this.machine.threads.create_thread({
938 let mut state = tls::TlsDtorsState::default();
939 Box::new(move |m| state.on_stack_empty(m))
940 });
941 match &mut this.machine.data_race {
942 GlobalDataRaceHandler::None => {}
943 GlobalDataRaceHandler::Vclocks(data_race) =>
944 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
945 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
946 genmc_ctx.handle_thread_create(
947 &this.machine.threads,
948 start_routine,
949 &func_arg,
950 new_thread_id,
951 )?,
952 }
953 if let Some(thread_info_place) = thread {
956 this.write_scalar(
957 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
958 &thread_info_place,
959 )?;
960 }
961
962 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
965
966 if let Some(thread_cpu_affinity) = &mut this.machine.thread_cpu_affinity
969 && let Some(cpuset) = thread_cpu_affinity.get(&old_thread_id).cloned()
970 {
971 thread_cpu_affinity.insert(new_thread_id, cpuset);
972 }
973
974 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
976
977 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
981
982 this.call_thread_root_function(
983 instance,
984 start_abi,
985 &[func_arg],
986 Some(&ret_place),
987 current_span,
988 )?;
989
990 this.machine.threads.set_active_thread_id(old_thread_id);
992
993 interp_ok(new_thread_id)
994 }
995
996 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
1001 let this = self.eval_context_mut();
1002
1003 let thread = this.active_thread_mut();
1005 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1006 thread.state = ThreadState::Terminated;
1007
1008 let gone_thread = this.active_thread();
1010 {
1011 let mut free_tls_statics = Vec::new();
1012 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
1013 if thread != gone_thread {
1014 return true;
1016 }
1017 free_tls_statics.push(alloc_id);
1020 false
1021 });
1022 for ptr in free_tls_statics {
1024 match tls_alloc_action {
1025 TlsAllocAction::Deallocate =>
1026 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
1027 TlsAllocAction::Leak =>
1028 if let Some(alloc) = ptr.provenance.get_alloc_id() {
1029 trace!(
1030 "Thread-local static leaked and stored as static root: {:?}",
1031 alloc
1032 );
1033 this.machine.static_roots.push(alloc);
1034 },
1035 }
1036 }
1037 }
1038
1039 match &mut this.machine.data_race {
1040 GlobalDataRaceHandler::None => {}
1041 GlobalDataRaceHandler::Vclocks(data_race) =>
1042 data_race.thread_terminated(&this.machine.threads),
1043 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
1044 genmc_ctx.handle_thread_finish(&this.machine.threads)
1047 }
1048 }
1049
1050 let unblock_reason = BlockReason::Join(gone_thread);
1052 let threads = &this.machine.threads.threads;
1053 let joining_threads = threads
1054 .iter_enumerated()
1055 .filter(|(_, thread)| thread.state.is_blocked_on(&unblock_reason))
1056 .map(|(id, _)| id)
1057 .collect::<Vec<_>>();
1058 for thread in joining_threads {
1059 this.unblock_thread(thread, unblock_reason.clone())?;
1060 }
1061
1062 interp_ok(())
1063 }
1064
1065 #[inline]
1068 fn block_thread(
1069 &mut self,
1070 reason: BlockReason,
1071 deadline: Option<Deadline>,
1072 callback: DynUnblockCallback<'tcx>,
1073 ) {
1074 let this = self.eval_context_mut();
1075 if deadline.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1076 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1077 }
1078 if matches!(deadline, Some(Deadline::RealTime(_))) && !this.machine.communicate() {
1079 panic!("cannot have `RealTime` timeout with isolation");
1080 }
1081 this.machine.threads.block_thread(reason, deadline, callback);
1082 }
1083
1084 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1087 let this = self.eval_context_mut();
1088 let old_state =
1089 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1090 let callback = match old_state {
1091 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1092 assert_eq!(
1093 reason, actual_reason,
1094 "unblock_thread: thread was blocked for the wrong reason"
1095 );
1096 callback
1097 }
1098 _ => panic!("unblock_thread: thread was not blocked"),
1099 };
1100 let old_thread = this.machine.threads.set_active_thread_id(thread);
1102 callback.call(this, UnblockKind::Ready)?;
1103 this.machine.threads.set_active_thread_id(old_thread);
1104 interp_ok(())
1105 }
1106
1107 #[inline]
1108 fn detach_thread(
1109 &mut self,
1110 thread_id: ThreadId,
1111 allow_terminated_joined: bool,
1112 ) -> InterpResult<'tcx> {
1113 let this = self.eval_context_mut();
1114 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1115 }
1116
1117 fn join_thread(
1121 &mut self,
1122 joined_thread_id: ThreadId,
1123 success_retval: Scalar,
1124 return_dest: &MPlaceTy<'tcx>,
1125 ) -> InterpResult<'tcx> {
1126 let this = self.eval_context_mut();
1127 let thread_mgr = &mut this.machine.threads;
1128 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1129 throw_ub_format!("trying to join a detached thread");
1131 }
1132
1133 fn after_join<'tcx>(
1134 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1135 joined_thread_id: ThreadId,
1136 success_retval: Scalar,
1137 return_dest: &MPlaceTy<'tcx>,
1138 ) -> InterpResult<'tcx> {
1139 let threads = &this.machine.threads;
1140 match &mut this.machine.data_race {
1141 GlobalDataRaceHandler::None => {}
1142 GlobalDataRaceHandler::Vclocks(data_race) =>
1143 data_race.thread_joined(threads, joined_thread_id),
1144 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1145 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1146 }
1147 this.write_scalar(success_retval, return_dest)?;
1148 interp_ok(())
1149 }
1150
1151 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1154 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1155 trace!(
1156 "{:?} blocked on {:?} when trying to join",
1157 thread_mgr.active_thread, joined_thread_id
1158 );
1159 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1160 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1161 }
1162
1163 let dest = return_dest.clone();
1166 thread_mgr.block_thread(
1167 BlockReason::Join(joined_thread_id),
1168 None,
1169 callback!(
1170 @capture<'tcx> {
1171 joined_thread_id: ThreadId,
1172 dest: MPlaceTy<'tcx>,
1173 success_retval: Scalar,
1174 }
1175 |this, unblock: UnblockKind| {
1176 assert_eq!(unblock, UnblockKind::Ready);
1177 after_join(this, joined_thread_id, success_retval, &dest)
1178 }
1179 ),
1180 );
1181 } else {
1182 after_join(this, joined_thread_id, success_retval, return_dest)?;
1184 }
1185 interp_ok(())
1186 }
1187
1188 fn join_thread_exclusive(
1193 &mut self,
1194 joined_thread_id: ThreadId,
1195 success_retval: Scalar,
1196 return_dest: &MPlaceTy<'tcx>,
1197 ) -> InterpResult<'tcx> {
1198 let this = self.eval_context_mut();
1199 let threads = &this.machine.threads.threads;
1200 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1201 throw_ub_format!("trying to join an already joined thread");
1202 }
1203
1204 if joined_thread_id == this.machine.threads.active_thread {
1205 throw_ub_format!("trying to join itself");
1206 }
1207
1208 assert!(
1210 threads.iter().all(|thread| {
1211 !thread.state.is_blocked_on(&BlockReason::Join(joined_thread_id))
1212 }),
1213 "this thread already has threads waiting for its termination"
1214 );
1215
1216 this.join_thread(joined_thread_id, success_retval, return_dest)
1217 }
1218
1219 #[inline]
1220 fn active_thread(&self) -> ThreadId {
1221 let this = self.eval_context_ref();
1222 this.machine.threads.active_thread()
1223 }
1224
1225 #[inline]
1226 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1227 let this = self.eval_context_mut();
1228 this.machine.threads.active_thread_mut()
1229 }
1230
1231 #[inline]
1232 fn active_thread_ref(&self) -> &Thread<'tcx> {
1233 let this = self.eval_context_ref();
1234 this.machine.threads.active_thread_ref()
1235 }
1236
1237 #[inline]
1238 fn get_total_thread_count(&self) -> usize {
1239 let this = self.eval_context_ref();
1240 this.machine.threads.get_total_thread_count()
1241 }
1242
1243 #[inline]
1244 fn have_all_terminated(&self) -> bool {
1245 let this = self.eval_context_ref();
1246 this.machine.threads.have_all_terminated()
1247 }
1248
1249 #[inline]
1250 fn enable_thread(&mut self, thread_id: ThreadId) {
1251 let this = self.eval_context_mut();
1252 this.machine.threads.enable_thread(thread_id);
1253 }
1254
1255 #[inline]
1256 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1257 let this = self.eval_context_ref();
1258 this.machine.threads.active_thread_stack()
1259 }
1260
1261 #[inline]
1262 fn active_thread_stack_mut<'a>(
1263 &'a mut self,
1264 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1265 let this = self.eval_context_mut();
1266 this.machine.threads.active_thread_stack_mut()
1267 }
1268
1269 #[inline]
1271 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1272 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1273 }
1274
1275 #[inline]
1276 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1277 where
1278 'tcx: 'c,
1279 {
1280 self.eval_context_ref().machine.threads.get_thread_name(thread)
1281 }
1282
1283 #[inline]
1284 fn yield_active_thread(&mut self) {
1285 self.eval_context_mut().machine.threads.yield_active_thread();
1286 }
1287
1288 #[inline]
1289 fn maybe_preempt_active_thread(&mut self) {
1290 let this = self.eval_context_mut();
1291 if !this.machine.threads.fixed_scheduling
1292 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1293 {
1294 this.yield_active_thread();
1295 }
1296 }
1297
1298 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1301 let this = self.eval_context_mut();
1302 loop {
1303 if CTRL_C_RECEIVED.load(Relaxed) {
1304 this.machine.handle_abnormal_termination();
1305 throw_machine_stop!(TerminationInfo::Interrupted);
1306 }
1307 match this.schedule()? {
1308 SchedulingAction::ExecuteStep => {
1309 this.miri_step()?;
1310 }
1311 SchedulingAction::SleepAndWaitForIo(duration) => {
1312 if this.machine.communicate() {
1313 this.poll_and_unblock(duration)?;
1318 } else {
1319 let duration = duration.expect(
1320 "Infinite sleep should not be triggered when isolation is enabled",
1321 );
1322 this.machine.monotonic_clock.sleep(duration);
1323 }
1324 }
1325 }
1326 }
1327 }
1328}