1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use rand::seq::IteratorRandom;
9use rustc_abi::ExternAbi;
10use rustc_const_eval::CTRL_C_RECEIVED;
11use rustc_data_structures::either::Either;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::{DUMMY_SP, Span};
18use rustc_target::spec::Os;
19
20use crate::concurrency::GlobalDataRaceHandler;
21use crate::shims::tls;
22use crate::*;
23
24#[derive(Clone, Copy, Debug, PartialEq)]
25enum SchedulingAction {
26 ExecuteStep,
28 ExecuteTimeoutCallback,
30 Sleep(Duration),
32}
33
34#[derive(Clone, Copy, Debug, PartialEq)]
36pub enum TlsAllocAction {
37 Deallocate,
39 Leak,
42}
43
44#[derive(Clone, Copy, Debug, PartialEq)]
46pub enum UnblockKind {
47 Ready,
49 TimedOut,
51}
52
53pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
56
57#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
59pub struct ThreadId(u32);
60
61impl ThreadId {
62 pub fn to_u32(self) -> u32 {
63 self.0
64 }
65
66 pub fn new_unchecked(id: u32) -> Self {
68 Self(id)
69 }
70
71 pub const MAIN_THREAD: ThreadId = ThreadId(0);
72}
73
74impl Idx for ThreadId {
75 fn new(idx: usize) -> Self {
76 ThreadId(u32::try_from(idx).unwrap())
77 }
78
79 fn index(self) -> usize {
80 usize::try_from(self.0).unwrap()
81 }
82}
83
84impl From<ThreadId> for u64 {
85 fn from(t: ThreadId) -> Self {
86 t.0.into()
87 }
88}
89
90#[derive(Debug, Copy, Clone, PartialEq, Eq)]
92pub enum BlockReason {
93 Join(ThreadId),
96 Sleep,
98 Mutex,
100 Condvar,
102 RwLock,
104 Futex,
106 InitOnce,
108 Epoll,
110 Eventfd,
112 UnnamedSocket,
114 Genmc,
117}
118
119enum ThreadState<'tcx> {
121 Enabled,
123 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
125 Terminated,
128}
129
130impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 Self::Enabled => write!(f, "Enabled"),
134 Self::Blocked { reason, timeout, .. } =>
135 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
136 Self::Terminated => write!(f, "Terminated"),
137 }
138 }
139}
140
141impl<'tcx> ThreadState<'tcx> {
142 fn is_enabled(&self) -> bool {
143 matches!(self, ThreadState::Enabled)
144 }
145
146 fn is_terminated(&self) -> bool {
147 matches!(self, ThreadState::Terminated)
148 }
149
150 fn is_blocked_on(&self, reason: BlockReason) -> bool {
151 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
152 }
153}
154
155#[derive(Debug, Copy, Clone, PartialEq, Eq)]
157enum ThreadJoinStatus {
158 Joinable,
160 Detached,
163 Joined,
165}
166
167pub struct Thread<'tcx> {
169 state: ThreadState<'tcx>,
170
171 thread_name: Option<Vec<u8>>,
173
174 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
176
177 pub(crate) origin_span: Span,
180
181 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
186
187 top_user_relevant_frame: Option<usize>,
192
193 join_status: ThreadJoinStatus,
195
196 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
205
206 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
208}
209
210pub type StackEmptyCallback<'tcx> =
211 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
212
213impl<'tcx> Thread<'tcx> {
214 fn thread_name(&self) -> Option<&[u8]> {
216 self.thread_name.as_deref()
217 }
218
219 pub fn is_enabled(&self) -> bool {
221 self.state.is_enabled()
222 }
223
224 fn thread_display_name(&self, id: ThreadId) -> String {
226 if let Some(ref thread_name) = self.thread_name {
227 String::from_utf8_lossy(thread_name).into_owned()
228 } else {
229 format!("unnamed-{}", id.index())
230 }
231 }
232
233 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
239 let mut best = None;
241 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
242 let relevance = frame.extra.user_relevance;
243 if relevance == u8::MAX {
244 return Some(idx);
246 }
247 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
248 best = Some((idx, relevance));
251 }
252 }
253 best.map(|(idx, _relevance)| idx)
254 }
255
256 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
259 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
260 }
261
262 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
265 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
266 self.top_user_relevant_frame = Some(frame_idx);
267 }
268
269 pub fn top_user_relevant_frame(&self) -> Option<usize> {
272 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
276 }
277
278 pub fn current_user_relevance(&self) -> u8 {
279 self.top_user_relevant_frame()
280 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
281 .unwrap_or(0)
282 }
283
284 pub fn current_user_relevant_span(&self) -> Span {
285 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
286 self.top_user_relevant_frame()
287 .map(|frame_idx| self.stack[frame_idx].current_span())
288 .unwrap_or(rustc_span::DUMMY_SP)
289 }
290}
291
292impl<'tcx> std::fmt::Debug for Thread<'tcx> {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 write!(
295 f,
296 "{}({:?}, {:?})",
297 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
298 self.state,
299 self.join_status
300 )
301 }
302}
303
304impl<'tcx> Thread<'tcx> {
305 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
306 Self {
307 state: ThreadState::Enabled,
308 thread_name: name.map(|name| Vec::from(name.as_bytes())),
309 stack: Vec::new(),
310 origin_span: DUMMY_SP,
311 top_user_relevant_frame: None,
312 join_status: ThreadJoinStatus::Joinable,
313 unwind_payloads: Vec::new(),
314 last_error: None,
315 on_stack_empty,
316 }
317 }
318}
319
320impl VisitProvenance for Thread<'_> {
321 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
322 let Thread {
323 unwind_payloads: panic_payload,
324 last_error,
325 stack,
326 origin_span: _,
327 top_user_relevant_frame: _,
328 state: _,
329 thread_name: _,
330 join_status: _,
331 on_stack_empty: _, } = self;
333
334 for payload in panic_payload {
335 payload.visit_provenance(visit);
336 }
337 last_error.visit_provenance(visit);
338 for frame in stack {
339 frame.visit_provenance(visit)
340 }
341 }
342}
343
344impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
345 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
346 let Frame {
347 return_place,
348 locals,
349 extra,
350 ..
352 } = self;
353
354 return_place.visit_provenance(visit);
356 for local in locals.iter() {
358 match local.as_mplace_or_imm() {
359 None => {}
360 Some(Either::Left((ptr, meta))) => {
361 ptr.visit_provenance(visit);
362 meta.visit_provenance(visit);
363 }
364 Some(Either::Right(imm)) => {
365 imm.visit_provenance(visit);
366 }
367 }
368 }
369
370 extra.visit_provenance(visit);
371 }
372}
373
374#[derive(Debug)]
376enum Timeout {
377 Monotonic(Instant),
378 RealTime(SystemTime),
379}
380
381impl Timeout {
382 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
384 match self {
385 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
386 Timeout::RealTime(time) =>
387 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
388 }
389 }
390
391 fn add_lossy(&self, duration: Duration) -> Self {
393 match self {
394 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
395 Timeout::RealTime(s) => {
396 Timeout::RealTime(
398 s.checked_add(duration)
399 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
400 )
401 }
402 }
403 }
404}
405
406#[derive(Debug, Copy, Clone, PartialEq)]
408pub enum TimeoutClock {
409 Monotonic,
410 RealTime,
411}
412
413#[derive(Debug, Copy, Clone)]
415pub enum TimeoutAnchor {
416 Relative,
417 Absolute,
418}
419
420#[derive(Debug, Copy, Clone)]
422pub struct ThreadNotFound;
423
424#[derive(Debug)]
426pub struct ThreadManager<'tcx> {
427 active_thread: ThreadId,
429 threads: IndexVec<ThreadId, Thread<'tcx>>,
433 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
435 yield_active_thread: bool,
438 fixed_scheduling: bool,
440}
441
442impl VisitProvenance for ThreadManager<'_> {
443 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
444 let ThreadManager {
445 threads,
446 thread_local_allocs,
447 active_thread: _,
448 yield_active_thread: _,
449 fixed_scheduling: _,
450 } = self;
451
452 for thread in threads {
453 thread.visit_provenance(visit);
454 }
455 for ptr in thread_local_allocs.values() {
456 ptr.visit_provenance(visit);
457 }
458 }
459}
460
461impl<'tcx> ThreadManager<'tcx> {
462 pub(crate) fn new(config: &MiriConfig) -> Self {
463 let mut threads = IndexVec::new();
464 threads.push(Thread::new(Some("main"), None));
466 Self {
467 active_thread: ThreadId::MAIN_THREAD,
468 threads,
469 thread_local_allocs: Default::default(),
470 yield_active_thread: false,
471 fixed_scheduling: config.fixed_scheduling,
472 }
473 }
474
475 pub(crate) fn init(
476 ecx: &mut MiriInterpCx<'tcx>,
477 on_main_stack_empty: StackEmptyCallback<'tcx>,
478 ) {
479 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
480 Some(on_main_stack_empty);
481 if ecx.tcx.sess.target.os != Os::Windows {
482 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
484 ThreadJoinStatus::Detached;
485 }
486 }
487
488 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
489 if let Ok(id) = id.try_into()
490 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
491 {
492 Ok(ThreadId(id))
493 } else {
494 Err(ThreadNotFound)
495 }
496 }
497
498 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
501 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
502 }
503
504 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
509 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
510 }
511
512 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
514 &self.threads[self.active_thread].stack
515 }
516
517 pub fn active_thread_stack_mut(
519 &mut self,
520 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
521 &mut self.threads[self.active_thread].stack
522 }
523
524 pub fn all_blocked_stacks(
525 &self,
526 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
527 self.threads
528 .iter_enumerated()
529 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
530 .map(|(id, t)| (id, &t.stack[..]))
531 }
532
533 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
535 let new_thread_id = ThreadId::new(self.threads.len());
536 self.threads.push(Thread::new(None, Some(on_stack_empty)));
537 new_thread_id
538 }
539
540 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
542 assert!(id.index() < self.threads.len());
543 info!(
544 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
545 self.get_thread_display_name(id),
546 self.get_thread_display_name(self.active_thread)
547 );
548 std::mem::replace(&mut self.active_thread, id)
549 }
550
551 pub fn active_thread(&self) -> ThreadId {
553 self.active_thread
554 }
555
556 pub fn get_total_thread_count(&self) -> usize {
558 self.threads.len()
559 }
560
561 pub fn get_live_thread_count(&self) -> usize {
564 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
565 }
566
567 fn has_terminated(&self, thread_id: ThreadId) -> bool {
569 self.threads[thread_id].state.is_terminated()
570 }
571
572 fn have_all_terminated(&self) -> bool {
574 self.threads.iter().all(|thread| thread.state.is_terminated())
575 }
576
577 fn enable_thread(&mut self, thread_id: ThreadId) {
579 assert!(self.has_terminated(thread_id));
580 self.threads[thread_id].state = ThreadState::Enabled;
581 }
582
583 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
585 &mut self.threads[self.active_thread]
586 }
587
588 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
590 &self.threads[self.active_thread]
591 }
592
593 pub fn thread_ref(&self, thread_id: ThreadId) -> &Thread<'tcx> {
594 &self.threads[thread_id]
595 }
596
597 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
606 trace!("detaching {:?}", id);
608
609 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
610 self.threads[id].join_status == ThreadJoinStatus::Detached
612 } else {
613 self.threads[id].join_status != ThreadJoinStatus::Joinable
614 };
615 if is_ub {
616 throw_ub_format!("trying to detach thread that was already detached or joined");
617 }
618
619 self.threads[id].join_status = ThreadJoinStatus::Detached;
620 interp_ok(())
621 }
622
623 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
625 self.threads[thread].thread_name = Some(new_thread_name);
626 }
627
628 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
630 self.threads[thread].thread_name()
631 }
632
633 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
634 self.threads[thread].thread_display_name(thread)
635 }
636
637 fn block_thread(
639 &mut self,
640 reason: BlockReason,
641 timeout: Option<Timeout>,
642 callback: DynUnblockCallback<'tcx>,
643 ) {
644 let state = &mut self.threads[self.active_thread].state;
645 assert!(state.is_enabled());
646 *state = ThreadState::Blocked { reason, timeout, callback }
647 }
648
649 fn yield_active_thread(&mut self) {
651 self.yield_active_thread = true;
655 }
656
657 fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
659 self.threads
660 .iter()
661 .filter_map(|t| {
662 match &t.state {
663 ThreadState::Blocked { timeout: Some(timeout), .. } =>
664 Some(timeout.get_wait_time(clock)),
665 _ => None,
666 }
667 })
668 .min()
669 }
670}
671
672impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
673trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
674 #[inline]
676 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
677 let this = self.eval_context_mut();
678 let mut found_callback = None;
679 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
681 match &thread.state {
682 ThreadState::Blocked { timeout: Some(timeout), .. }
683 if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
684 {
685 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
686 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
687 found_callback = Some((id, callback));
688 break;
690 }
691 _ => {}
692 }
693 }
694 if let Some((thread, callback)) = found_callback {
695 let old_thread = this.machine.threads.set_active_thread_id(thread);
702 callback.call(this, UnblockKind::TimedOut)?;
703 this.machine.threads.set_active_thread_id(old_thread);
704 }
705 interp_ok(())
712 }
713
714 #[inline]
715 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
716 let this = self.eval_context_mut();
717 let active_thread = this.active_thread_mut();
718 active_thread.origin_span = DUMMY_SP; let mut callback = active_thread
720 .on_stack_empty
721 .take()
722 .expect("`on_stack_empty` not set up, or already running");
723 let res = callback(this)?;
724 this.active_thread_mut().on_stack_empty = Some(callback);
725 interp_ok(res)
726 }
727
728 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
737 let this = self.eval_context_mut();
738
739 if this.machine.data_race.as_genmc_ref().is_some() {
741 loop {
742 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
743 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
744 return interp_ok(SchedulingAction::ExecuteStep);
745 };
746 if this.machine.threads.threads[next_thread_id]
748 .state
749 .is_blocked_on(BlockReason::Genmc)
750 {
751 info!(
752 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
753 );
754 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
755 }
756 let thread_manager = &mut this.machine.threads;
759 if thread_manager.threads[next_thread_id].state.is_enabled() {
760 thread_manager.active_thread = next_thread_id;
762 return interp_ok(SchedulingAction::ExecuteStep);
763 }
764 }
765 }
766
767 let thread_manager = &mut this.machine.threads;
769 let clock = &this.machine.monotonic_clock;
770 let rng = this.machine.rng.get_mut();
771 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
773 && !thread_manager.yield_active_thread
774 {
775 return interp_ok(SchedulingAction::ExecuteStep);
777 }
778 let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
785 if potential_sleep_time == Some(Duration::ZERO) {
786 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
787 }
788 let mut threads_iter = thread_manager
795 .threads
796 .iter_enumerated()
797 .skip(thread_manager.active_thread.index() + 1)
798 .chain(
799 thread_manager
800 .threads
801 .iter_enumerated()
802 .take(thread_manager.active_thread.index() + 1),
803 )
804 .filter(|(_id, thread)| thread.state.is_enabled());
805 let new_thread = if thread_manager.fixed_scheduling {
807 threads_iter.next()
808 } else {
809 threads_iter.choose(rng)
810 };
811
812 if let Some((id, _thread)) = new_thread {
813 if thread_manager.active_thread != id {
814 info!(
815 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
816 thread_manager.get_thread_display_name(id),
817 thread_manager.get_thread_display_name(thread_manager.active_thread)
818 );
819 thread_manager.active_thread = id;
820 }
821 }
822 thread_manager.yield_active_thread = false;
824
825 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
826 return interp_ok(SchedulingAction::ExecuteStep);
827 }
828 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
830 unreachable!("all threads terminated without the main thread terminating?!");
831 } else if let Some(sleep_time) = potential_sleep_time {
832 interp_ok(SchedulingAction::Sleep(sleep_time))
836 } else {
837 throw_machine_stop!(TerminationInfo::Deadlock);
838 }
839 }
840}
841
842impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
844pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
845 #[inline]
846 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
847 self.eval_context_ref().machine.threads.thread_id_try_from(id)
848 }
849
850 fn get_or_create_thread_local_alloc(
853 &mut self,
854 def_id: DefId,
855 ) -> InterpResult<'tcx, StrictPointer> {
856 let this = self.eval_context_mut();
857 let tcx = this.tcx;
858 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
859 interp_ok(old_alloc)
862 } else {
863 if tcx.is_foreign_item(def_id) {
867 throw_unsup_format!("foreign thread-local statics are not supported");
868 }
869 let params = this.machine.get_default_alloc_params();
870 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
871 let mut alloc = alloc.inner().adjust_from_tcx(
873 &this.tcx,
874 |bytes, align| {
875 interp_ok(MiriAllocBytes::from_bytes(
876 std::borrow::Cow::Borrowed(bytes),
877 align,
878 params,
879 ))
880 },
881 |ptr| this.global_root_pointer(ptr),
882 )?;
883 alloc.mutability = Mutability::Mut;
885 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
887 this.machine.threads.set_thread_local_alloc(def_id, ptr);
888 interp_ok(ptr)
889 }
890 }
891
892 #[inline]
894 fn start_regular_thread(
895 &mut self,
896 thread: Option<MPlaceTy<'tcx>>,
897 start_routine: Pointer,
898 start_abi: ExternAbi,
899 func_arg: ImmTy<'tcx>,
900 ret_layout: TyAndLayout<'tcx>,
901 ) -> InterpResult<'tcx, ThreadId> {
902 let this = self.eval_context_mut();
903
904 let current_span = this.machine.current_user_relevant_span();
906 let new_thread_id = this.machine.threads.create_thread({
907 let mut state = tls::TlsDtorsState::default();
908 Box::new(move |m| state.on_stack_empty(m))
909 });
910 match &mut this.machine.data_race {
911 GlobalDataRaceHandler::None => {}
912 GlobalDataRaceHandler::Vclocks(data_race) =>
913 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
914 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
915 genmc_ctx.handle_thread_create(
916 &this.machine.threads,
917 start_routine,
918 &func_arg,
919 new_thread_id,
920 )?,
921 }
922 if let Some(thread_info_place) = thread {
925 this.write_scalar(
926 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
927 &thread_info_place,
928 )?;
929 }
930
931 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
934
935 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
937 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
938 }
939
940 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
942
943 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
947
948 this.call_thread_root_function(
949 instance,
950 start_abi,
951 &[func_arg],
952 Some(&ret_place),
953 current_span,
954 )?;
955
956 this.machine.threads.set_active_thread_id(old_thread_id);
958
959 interp_ok(new_thread_id)
960 }
961
962 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
967 let this = self.eval_context_mut();
968
969 let thread = this.active_thread_mut();
971 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
972 thread.state = ThreadState::Terminated;
973
974 let gone_thread = this.active_thread();
976 {
977 let mut free_tls_statics = Vec::new();
978 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
979 if thread != gone_thread {
980 return true;
982 }
983 free_tls_statics.push(alloc_id);
986 false
987 });
988 for ptr in free_tls_statics {
990 match tls_alloc_action {
991 TlsAllocAction::Deallocate =>
992 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
993 TlsAllocAction::Leak =>
994 if let Some(alloc) = ptr.provenance.get_alloc_id() {
995 trace!(
996 "Thread-local static leaked and stored as static root: {:?}",
997 alloc
998 );
999 this.machine.static_roots.push(alloc);
1000 },
1001 }
1002 }
1003 }
1004
1005 match &mut this.machine.data_race {
1006 GlobalDataRaceHandler::None => {}
1007 GlobalDataRaceHandler::Vclocks(data_race) =>
1008 data_race.thread_terminated(&this.machine.threads),
1009 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
1010 genmc_ctx.handle_thread_finish(&this.machine.threads)
1013 }
1014 }
1015
1016 let unblock_reason = BlockReason::Join(gone_thread);
1018 let threads = &this.machine.threads.threads;
1019 let joining_threads = threads
1020 .iter_enumerated()
1021 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
1022 .map(|(id, _)| id)
1023 .collect::<Vec<_>>();
1024 for thread in joining_threads {
1025 this.unblock_thread(thread, unblock_reason)?;
1026 }
1027
1028 interp_ok(())
1029 }
1030
1031 #[inline]
1034 fn block_thread(
1035 &mut self,
1036 reason: BlockReason,
1037 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
1038 callback: DynUnblockCallback<'tcx>,
1039 ) {
1040 let this = self.eval_context_mut();
1041 if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1042 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1043 }
1044 let timeout = timeout.map(|(clock, anchor, duration)| {
1045 let anchor = match clock {
1046 TimeoutClock::RealTime => {
1047 assert!(
1048 this.machine.communicate(),
1049 "cannot have `RealTime` timeout with isolation enabled!"
1050 );
1051 Timeout::RealTime(match anchor {
1052 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1053 TimeoutAnchor::Relative => SystemTime::now(),
1054 })
1055 }
1056 TimeoutClock::Monotonic =>
1057 Timeout::Monotonic(match anchor {
1058 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1059 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1060 }),
1061 };
1062 anchor.add_lossy(duration)
1063 });
1064 this.machine.threads.block_thread(reason, timeout, callback);
1065 }
1066
1067 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1070 let this = self.eval_context_mut();
1071 let old_state =
1072 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1073 let callback = match old_state {
1074 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1075 assert_eq!(
1076 reason, actual_reason,
1077 "unblock_thread: thread was blocked for the wrong reason"
1078 );
1079 callback
1080 }
1081 _ => panic!("unblock_thread: thread was not blocked"),
1082 };
1083 let old_thread = this.machine.threads.set_active_thread_id(thread);
1085 callback.call(this, UnblockKind::Ready)?;
1086 this.machine.threads.set_active_thread_id(old_thread);
1087 interp_ok(())
1088 }
1089
1090 #[inline]
1091 fn detach_thread(
1092 &mut self,
1093 thread_id: ThreadId,
1094 allow_terminated_joined: bool,
1095 ) -> InterpResult<'tcx> {
1096 let this = self.eval_context_mut();
1097 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1098 }
1099
1100 fn join_thread(
1104 &mut self,
1105 joined_thread_id: ThreadId,
1106 success_retval: Scalar,
1107 return_dest: &MPlaceTy<'tcx>,
1108 ) -> InterpResult<'tcx> {
1109 let this = self.eval_context_mut();
1110 let thread_mgr = &mut this.machine.threads;
1111 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1112 throw_ub_format!("trying to join a detached thread");
1114 }
1115
1116 fn after_join<'tcx>(
1117 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1118 joined_thread_id: ThreadId,
1119 success_retval: Scalar,
1120 return_dest: &MPlaceTy<'tcx>,
1121 ) -> InterpResult<'tcx> {
1122 let threads = &this.machine.threads;
1123 match &mut this.machine.data_race {
1124 GlobalDataRaceHandler::None => {}
1125 GlobalDataRaceHandler::Vclocks(data_race) =>
1126 data_race.thread_joined(threads, joined_thread_id),
1127 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1128 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1129 }
1130 this.write_scalar(success_retval, return_dest)?;
1131 interp_ok(())
1132 }
1133
1134 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1137 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1138 trace!(
1139 "{:?} blocked on {:?} when trying to join",
1140 thread_mgr.active_thread, joined_thread_id
1141 );
1142 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1143 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1144 }
1145
1146 let dest = return_dest.clone();
1149 thread_mgr.block_thread(
1150 BlockReason::Join(joined_thread_id),
1151 None,
1152 callback!(
1153 @capture<'tcx> {
1154 joined_thread_id: ThreadId,
1155 dest: MPlaceTy<'tcx>,
1156 success_retval: Scalar,
1157 }
1158 |this, unblock: UnblockKind| {
1159 assert_eq!(unblock, UnblockKind::Ready);
1160 after_join(this, joined_thread_id, success_retval, &dest)
1161 }
1162 ),
1163 );
1164 } else {
1165 after_join(this, joined_thread_id, success_retval, return_dest)?;
1167 }
1168 interp_ok(())
1169 }
1170
1171 fn join_thread_exclusive(
1176 &mut self,
1177 joined_thread_id: ThreadId,
1178 success_retval: Scalar,
1179 return_dest: &MPlaceTy<'tcx>,
1180 ) -> InterpResult<'tcx> {
1181 let this = self.eval_context_mut();
1182 let threads = &this.machine.threads.threads;
1183 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1184 throw_ub_format!("trying to join an already joined thread");
1185 }
1186
1187 if joined_thread_id == this.machine.threads.active_thread {
1188 throw_ub_format!("trying to join itself");
1189 }
1190
1191 assert!(
1193 threads
1194 .iter()
1195 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1196 "this thread already has threads waiting for its termination"
1197 );
1198
1199 this.join_thread(joined_thread_id, success_retval, return_dest)
1200 }
1201
1202 #[inline]
1203 fn active_thread(&self) -> ThreadId {
1204 let this = self.eval_context_ref();
1205 this.machine.threads.active_thread()
1206 }
1207
1208 #[inline]
1209 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1210 let this = self.eval_context_mut();
1211 this.machine.threads.active_thread_mut()
1212 }
1213
1214 #[inline]
1215 fn active_thread_ref(&self) -> &Thread<'tcx> {
1216 let this = self.eval_context_ref();
1217 this.machine.threads.active_thread_ref()
1218 }
1219
1220 #[inline]
1221 fn get_total_thread_count(&self) -> usize {
1222 let this = self.eval_context_ref();
1223 this.machine.threads.get_total_thread_count()
1224 }
1225
1226 #[inline]
1227 fn have_all_terminated(&self) -> bool {
1228 let this = self.eval_context_ref();
1229 this.machine.threads.have_all_terminated()
1230 }
1231
1232 #[inline]
1233 fn enable_thread(&mut self, thread_id: ThreadId) {
1234 let this = self.eval_context_mut();
1235 this.machine.threads.enable_thread(thread_id);
1236 }
1237
1238 #[inline]
1239 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1240 let this = self.eval_context_ref();
1241 this.machine.threads.active_thread_stack()
1242 }
1243
1244 #[inline]
1245 fn active_thread_stack_mut<'a>(
1246 &'a mut self,
1247 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1248 let this = self.eval_context_mut();
1249 this.machine.threads.active_thread_stack_mut()
1250 }
1251
1252 #[inline]
1254 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1255 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1256 }
1257
1258 #[inline]
1259 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1260 where
1261 'tcx: 'c,
1262 {
1263 self.eval_context_ref().machine.threads.get_thread_name(thread)
1264 }
1265
1266 #[inline]
1267 fn yield_active_thread(&mut self) {
1268 self.eval_context_mut().machine.threads.yield_active_thread();
1269 }
1270
1271 #[inline]
1272 fn maybe_preempt_active_thread(&mut self) {
1273 use rand::Rng as _;
1274
1275 let this = self.eval_context_mut();
1276 if !this.machine.threads.fixed_scheduling
1277 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1278 {
1279 this.yield_active_thread();
1280 }
1281 }
1282
1283 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1286 let this = self.eval_context_mut();
1287 loop {
1288 if CTRL_C_RECEIVED.load(Relaxed) {
1289 this.machine.handle_abnormal_termination();
1290 throw_machine_stop!(TerminationInfo::Interrupted);
1291 }
1292 match this.schedule()? {
1293 SchedulingAction::ExecuteStep => {
1294 if !this.step()? {
1295 match this.run_on_stack_empty()? {
1297 Poll::Pending => {} Poll::Ready(()) =>
1299 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1300 }
1301 }
1302 }
1303 SchedulingAction::ExecuteTimeoutCallback => {
1304 this.run_timeout_callback()?;
1305 }
1306 SchedulingAction::Sleep(duration) => {
1307 this.machine.monotonic_clock.sleep(duration);
1308 }
1309 }
1310 }
1311 }
1312}