1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use rand::seq::IteratorRandom;
9use rustc_abi::ExternAbi;
10use rustc_const_eval::CTRL_C_RECEIVED;
11use rustc_data_structures::either::Either;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18
19use crate::concurrency::GlobalDataRaceHandler;
20use crate::shims::tls;
21use crate::*;
22
23#[derive(Clone, Copy, Debug, PartialEq)]
24enum SchedulingAction {
25 ExecuteStep,
27 ExecuteTimeoutCallback,
29 Sleep(Duration),
31}
32
33#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum TlsAllocAction {
36 Deallocate,
38 Leak,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq)]
45pub enum UnblockKind {
46 Ready,
48 TimedOut,
50}
51
52pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
55
56#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
58pub struct ThreadId(u32);
59
60impl ThreadId {
61 pub fn to_u32(self) -> u32 {
62 self.0
63 }
64
65 pub fn new_unchecked(id: u32) -> Self {
67 Self(id)
68 }
69
70 pub const MAIN_THREAD: ThreadId = ThreadId(0);
71}
72
73impl Idx for ThreadId {
74 fn new(idx: usize) -> Self {
75 ThreadId(u32::try_from(idx).unwrap())
76 }
77
78 fn index(self) -> usize {
79 usize::try_from(self.0).unwrap()
80 }
81}
82
83impl From<ThreadId> for u64 {
84 fn from(t: ThreadId) -> Self {
85 t.0.into()
86 }
87}
88
89#[derive(Debug, Copy, Clone, PartialEq, Eq)]
91pub enum BlockReason {
92 Join(ThreadId),
95 Sleep,
97 Mutex,
99 Condvar,
101 RwLock,
103 Futex,
105 InitOnce,
107 Epoll,
109 Eventfd,
111 UnnamedSocket,
113 Genmc,
116}
117
118enum ThreadState<'tcx> {
120 Enabled,
122 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
124 Terminated,
127}
128
129impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 Self::Enabled => write!(f, "Enabled"),
133 Self::Blocked { reason, timeout, .. } =>
134 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
135 Self::Terminated => write!(f, "Terminated"),
136 }
137 }
138}
139
140impl<'tcx> ThreadState<'tcx> {
141 fn is_enabled(&self) -> bool {
142 matches!(self, ThreadState::Enabled)
143 }
144
145 fn is_terminated(&self) -> bool {
146 matches!(self, ThreadState::Terminated)
147 }
148
149 fn is_blocked_on(&self, reason: BlockReason) -> bool {
150 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
151 }
152}
153
154#[derive(Debug, Copy, Clone, PartialEq, Eq)]
156enum ThreadJoinStatus {
157 Joinable,
159 Detached,
162 Joined,
164}
165
166pub struct Thread<'tcx> {
168 state: ThreadState<'tcx>,
169
170 thread_name: Option<Vec<u8>>,
172
173 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
175
176 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
181
182 top_user_relevant_frame: Option<usize>,
188
189 join_status: ThreadJoinStatus,
191
192 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
201
202 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
204}
205
206pub type StackEmptyCallback<'tcx> =
207 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
208
209impl<'tcx> Thread<'tcx> {
210 fn thread_name(&self) -> Option<&[u8]> {
212 self.thread_name.as_deref()
213 }
214
215 pub fn is_enabled(&self) -> bool {
217 self.state.is_enabled()
218 }
219
220 fn thread_display_name(&self, id: ThreadId) -> String {
222 if let Some(ref thread_name) = self.thread_name {
223 String::from_utf8_lossy(thread_name).into_owned()
224 } else {
225 format!("unnamed-{}", id.index())
226 }
227 }
228
229 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
235 self.stack
236 .iter()
237 .enumerate()
238 .rev()
239 .skip(skip)
240 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
241 }
242
243 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
246 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
247 }
248
249 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
252 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
253 self.top_user_relevant_frame = Some(frame_idx);
254 }
255
256 pub fn top_user_relevant_frame(&self) -> Option<usize> {
259 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
260 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
264 }
265
266 pub fn current_user_relevant_span(&self) -> Span {
267 self.top_user_relevant_frame()
268 .map(|frame_idx| self.stack[frame_idx].current_span())
269 .unwrap_or(rustc_span::DUMMY_SP)
270 }
271}
272
273impl<'tcx> std::fmt::Debug for Thread<'tcx> {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 write!(
276 f,
277 "{}({:?}, {:?})",
278 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
279 self.state,
280 self.join_status
281 )
282 }
283}
284
285impl<'tcx> Thread<'tcx> {
286 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
287 Self {
288 state: ThreadState::Enabled,
289 thread_name: name.map(|name| Vec::from(name.as_bytes())),
290 stack: Vec::new(),
291 top_user_relevant_frame: None,
292 join_status: ThreadJoinStatus::Joinable,
293 unwind_payloads: Vec::new(),
294 last_error: None,
295 on_stack_empty,
296 }
297 }
298}
299
300impl VisitProvenance for Thread<'_> {
301 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
302 let Thread {
303 unwind_payloads: panic_payload,
304 last_error,
305 stack,
306 top_user_relevant_frame: _,
307 state: _,
308 thread_name: _,
309 join_status: _,
310 on_stack_empty: _, } = self;
312
313 for payload in panic_payload {
314 payload.visit_provenance(visit);
315 }
316 last_error.visit_provenance(visit);
317 for frame in stack {
318 frame.visit_provenance(visit)
319 }
320 }
321}
322
323impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
324 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
325 let Frame {
326 return_place,
327 locals,
328 extra,
329 ..
331 } = self;
332
333 return_place.visit_provenance(visit);
335 for local in locals.iter() {
337 match local.as_mplace_or_imm() {
338 None => {}
339 Some(Either::Left((ptr, meta))) => {
340 ptr.visit_provenance(visit);
341 meta.visit_provenance(visit);
342 }
343 Some(Either::Right(imm)) => {
344 imm.visit_provenance(visit);
345 }
346 }
347 }
348
349 extra.visit_provenance(visit);
350 }
351}
352
353#[derive(Debug)]
355enum Timeout {
356 Monotonic(Instant),
357 RealTime(SystemTime),
358}
359
360impl Timeout {
361 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
363 match self {
364 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
365 Timeout::RealTime(time) =>
366 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
367 }
368 }
369
370 fn add_lossy(&self, duration: Duration) -> Self {
372 match self {
373 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
374 Timeout::RealTime(s) => {
375 Timeout::RealTime(
377 s.checked_add(duration)
378 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
379 )
380 }
381 }
382 }
383}
384
385#[derive(Debug, Copy, Clone, PartialEq)]
387pub enum TimeoutClock {
388 Monotonic,
389 RealTime,
390}
391
392#[derive(Debug, Copy, Clone)]
394pub enum TimeoutAnchor {
395 Relative,
396 Absolute,
397}
398
399#[derive(Debug, Copy, Clone)]
401pub struct ThreadNotFound;
402
403#[derive(Debug)]
405pub struct ThreadManager<'tcx> {
406 active_thread: ThreadId,
408 threads: IndexVec<ThreadId, Thread<'tcx>>,
412 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
414 yield_active_thread: bool,
417 fixed_scheduling: bool,
419}
420
421impl VisitProvenance for ThreadManager<'_> {
422 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
423 let ThreadManager {
424 threads,
425 thread_local_allocs,
426 active_thread: _,
427 yield_active_thread: _,
428 fixed_scheduling: _,
429 } = self;
430
431 for thread in threads {
432 thread.visit_provenance(visit);
433 }
434 for ptr in thread_local_allocs.values() {
435 ptr.visit_provenance(visit);
436 }
437 }
438}
439
440impl<'tcx> ThreadManager<'tcx> {
441 pub(crate) fn new(config: &MiriConfig) -> Self {
442 let mut threads = IndexVec::new();
443 threads.push(Thread::new(Some("main"), None));
445 Self {
446 active_thread: ThreadId::MAIN_THREAD,
447 threads,
448 thread_local_allocs: Default::default(),
449 yield_active_thread: false,
450 fixed_scheduling: config.fixed_scheduling,
451 }
452 }
453
454 pub(crate) fn init(
455 ecx: &mut MiriInterpCx<'tcx>,
456 on_main_stack_empty: StackEmptyCallback<'tcx>,
457 ) {
458 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
459 Some(on_main_stack_empty);
460 if ecx.tcx.sess.target.os.as_ref() != "windows" {
461 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
463 ThreadJoinStatus::Detached;
464 }
465 }
466
467 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
468 if let Ok(id) = id.try_into()
469 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
470 {
471 Ok(ThreadId(id))
472 } else {
473 Err(ThreadNotFound)
474 }
475 }
476
477 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
480 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
481 }
482
483 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
488 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
489 }
490
491 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
493 &self.threads[self.active_thread].stack
494 }
495
496 pub fn active_thread_stack_mut(
498 &mut self,
499 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
500 &mut self.threads[self.active_thread].stack
501 }
502
503 pub fn all_stacks(
504 &self,
505 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
506 self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
507 }
508
509 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
511 let new_thread_id = ThreadId::new(self.threads.len());
512 self.threads.push(Thread::new(None, Some(on_stack_empty)));
513 new_thread_id
514 }
515
516 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
518 assert!(id.index() < self.threads.len());
519 info!(
520 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
521 self.get_thread_display_name(id),
522 self.get_thread_display_name(self.active_thread)
523 );
524 std::mem::replace(&mut self.active_thread, id)
525 }
526
527 pub fn active_thread(&self) -> ThreadId {
529 self.active_thread
530 }
531
532 pub fn get_total_thread_count(&self) -> usize {
534 self.threads.len()
535 }
536
537 pub fn get_live_thread_count(&self) -> usize {
540 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
541 }
542
543 fn has_terminated(&self, thread_id: ThreadId) -> bool {
545 self.threads[thread_id].state.is_terminated()
546 }
547
548 fn have_all_terminated(&self) -> bool {
550 self.threads.iter().all(|thread| thread.state.is_terminated())
551 }
552
553 fn enable_thread(&mut self, thread_id: ThreadId) {
555 assert!(self.has_terminated(thread_id));
556 self.threads[thread_id].state = ThreadState::Enabled;
557 }
558
559 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
561 &mut self.threads[self.active_thread]
562 }
563
564 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
566 &self.threads[self.active_thread]
567 }
568
569 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
578 trace!("detaching {:?}", id);
580
581 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
582 self.threads[id].join_status == ThreadJoinStatus::Detached
584 } else {
585 self.threads[id].join_status != ThreadJoinStatus::Joinable
586 };
587 if is_ub {
588 throw_ub_format!("trying to detach thread that was already detached or joined");
589 }
590
591 self.threads[id].join_status = ThreadJoinStatus::Detached;
592 interp_ok(())
593 }
594
595 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
597 self.threads[thread].thread_name = Some(new_thread_name);
598 }
599
600 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
602 self.threads[thread].thread_name()
603 }
604
605 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
606 self.threads[thread].thread_display_name(thread)
607 }
608
609 fn block_thread(
611 &mut self,
612 reason: BlockReason,
613 timeout: Option<Timeout>,
614 callback: DynUnblockCallback<'tcx>,
615 ) {
616 let state = &mut self.threads[self.active_thread].state;
617 assert!(state.is_enabled());
618 *state = ThreadState::Blocked { reason, timeout, callback }
619 }
620
621 fn yield_active_thread(&mut self) {
623 self.yield_active_thread = true;
627 }
628
629 fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
631 self.threads
632 .iter()
633 .filter_map(|t| {
634 match &t.state {
635 ThreadState::Blocked { timeout: Some(timeout), .. } =>
636 Some(timeout.get_wait_time(clock)),
637 _ => None,
638 }
639 })
640 .min()
641 }
642}
643
644impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
645trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
646 #[inline]
648 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
649 let this = self.eval_context_mut();
650 let mut found_callback = None;
651 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
653 match &thread.state {
654 ThreadState::Blocked { timeout: Some(timeout), .. }
655 if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
656 {
657 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
658 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
659 found_callback = Some((id, callback));
660 break;
662 }
663 _ => {}
664 }
665 }
666 if let Some((thread, callback)) = found_callback {
667 let old_thread = this.machine.threads.set_active_thread_id(thread);
674 callback.call(this, UnblockKind::TimedOut)?;
675 this.machine.threads.set_active_thread_id(old_thread);
676 }
677 interp_ok(())
684 }
685
686 #[inline]
687 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
688 let this = self.eval_context_mut();
689 let mut callback = this
690 .active_thread_mut()
691 .on_stack_empty
692 .take()
693 .expect("`on_stack_empty` not set up, or already running");
694 let res = callback(this)?;
695 this.active_thread_mut().on_stack_empty = Some(callback);
696 interp_ok(res)
697 }
698
699 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
708 let this = self.eval_context_mut();
709
710 if this.machine.data_race.as_genmc_ref().is_some() {
712 loop {
713 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
714 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
715 return interp_ok(SchedulingAction::ExecuteStep);
716 };
717 if this.machine.threads.threads[next_thread_id]
719 .state
720 .is_blocked_on(BlockReason::Genmc)
721 {
722 info!(
723 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
724 );
725 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
726 }
727 let thread_manager = &mut this.machine.threads;
730 if thread_manager.threads[next_thread_id].state.is_enabled() {
731 thread_manager.active_thread = next_thread_id;
733 return interp_ok(SchedulingAction::ExecuteStep);
734 }
735 }
736 }
737
738 let thread_manager = &mut this.machine.threads;
740 let clock = &this.machine.monotonic_clock;
741 let rng = this.machine.rng.get_mut();
742 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
744 && !thread_manager.yield_active_thread
745 {
746 return interp_ok(SchedulingAction::ExecuteStep);
748 }
749 let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
756 if potential_sleep_time == Some(Duration::ZERO) {
757 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
758 }
759 let mut threads_iter = thread_manager
766 .threads
767 .iter_enumerated()
768 .skip(thread_manager.active_thread.index() + 1)
769 .chain(
770 thread_manager
771 .threads
772 .iter_enumerated()
773 .take(thread_manager.active_thread.index() + 1),
774 )
775 .filter(|(_id, thread)| thread.state.is_enabled());
776 let new_thread = if thread_manager.fixed_scheduling {
778 threads_iter.next()
779 } else {
780 threads_iter.choose(rng)
781 };
782
783 if let Some((id, _thread)) = new_thread {
784 if thread_manager.active_thread != id {
785 info!(
786 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
787 thread_manager.get_thread_display_name(id),
788 thread_manager.get_thread_display_name(thread_manager.active_thread)
789 );
790 thread_manager.active_thread = id;
791 }
792 }
793 thread_manager.yield_active_thread = false;
795
796 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
797 return interp_ok(SchedulingAction::ExecuteStep);
798 }
799 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
801 unreachable!("all threads terminated without the main thread terminating?!");
802 } else if let Some(sleep_time) = potential_sleep_time {
803 interp_ok(SchedulingAction::Sleep(sleep_time))
807 } else {
808 throw_machine_stop!(TerminationInfo::Deadlock);
809 }
810 }
811}
812
813impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
815pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
816 #[inline]
817 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
818 self.eval_context_ref().machine.threads.thread_id_try_from(id)
819 }
820
821 fn get_or_create_thread_local_alloc(
824 &mut self,
825 def_id: DefId,
826 ) -> InterpResult<'tcx, StrictPointer> {
827 let this = self.eval_context_mut();
828 let tcx = this.tcx;
829 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
830 interp_ok(old_alloc)
833 } else {
834 if tcx.is_foreign_item(def_id) {
838 throw_unsup_format!("foreign thread-local statics are not supported");
839 }
840 let params = this.machine.get_default_alloc_params();
841 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
842 let mut alloc = alloc.inner().adjust_from_tcx(
844 &this.tcx,
845 |bytes, align| {
846 interp_ok(MiriAllocBytes::from_bytes(
847 std::borrow::Cow::Borrowed(bytes),
848 align,
849 params,
850 ))
851 },
852 |ptr| this.global_root_pointer(ptr),
853 )?;
854 alloc.mutability = Mutability::Mut;
856 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
858 this.machine.threads.set_thread_local_alloc(def_id, ptr);
859 interp_ok(ptr)
860 }
861 }
862
863 #[inline]
865 fn start_regular_thread(
866 &mut self,
867 thread: Option<MPlaceTy<'tcx>>,
868 start_routine: Pointer,
869 start_abi: ExternAbi,
870 func_arg: ImmTy<'tcx>,
871 ret_layout: TyAndLayout<'tcx>,
872 ) -> InterpResult<'tcx, ThreadId> {
873 let this = self.eval_context_mut();
874
875 let new_thread_id = this.machine.threads.create_thread({
877 let mut state = tls::TlsDtorsState::default();
878 Box::new(move |m| state.on_stack_empty(m))
879 });
880 let current_span = this.machine.current_user_relevant_span();
881 match &mut this.machine.data_race {
882 GlobalDataRaceHandler::None => {}
883 GlobalDataRaceHandler::Vclocks(data_race) =>
884 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
885 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
886 genmc_ctx.handle_thread_create(
887 &this.machine.threads,
888 start_routine,
889 &func_arg,
890 new_thread_id,
891 )?,
892 }
893 if let Some(thread_info_place) = thread {
896 this.write_scalar(
897 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
898 &thread_info_place,
899 )?;
900 }
901
902 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
905
906 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
908 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
909 }
910
911 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
913
914 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
918
919 this.call_function(
920 instance,
921 start_abi,
922 &[func_arg],
923 Some(&ret_place),
924 ReturnContinuation::Stop { cleanup: true },
925 )?;
926
927 this.machine.threads.set_active_thread_id(old_thread_id);
929
930 interp_ok(new_thread_id)
931 }
932
933 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
938 let this = self.eval_context_mut();
939
940 let thread = this.active_thread_mut();
942 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
943 thread.state = ThreadState::Terminated;
944
945 let gone_thread = this.active_thread();
947 {
948 let mut free_tls_statics = Vec::new();
949 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
950 if thread != gone_thread {
951 return true;
953 }
954 free_tls_statics.push(alloc_id);
957 false
958 });
959 for ptr in free_tls_statics {
961 match tls_alloc_action {
962 TlsAllocAction::Deallocate =>
963 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
964 TlsAllocAction::Leak =>
965 if let Some(alloc) = ptr.provenance.get_alloc_id() {
966 trace!(
967 "Thread-local static leaked and stored as static root: {:?}",
968 alloc
969 );
970 this.machine.static_roots.push(alloc);
971 },
972 }
973 }
974 }
975
976 match &mut this.machine.data_race {
977 GlobalDataRaceHandler::None => {}
978 GlobalDataRaceHandler::Vclocks(data_race) =>
979 data_race.thread_terminated(&this.machine.threads),
980 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
981 genmc_ctx.handle_thread_finish(&this.machine.threads)
984 }
985 }
986
987 let unblock_reason = BlockReason::Join(gone_thread);
989 let threads = &this.machine.threads.threads;
990 let joining_threads = threads
991 .iter_enumerated()
992 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
993 .map(|(id, _)| id)
994 .collect::<Vec<_>>();
995 for thread in joining_threads {
996 this.unblock_thread(thread, unblock_reason)?;
997 }
998
999 interp_ok(())
1000 }
1001
1002 #[inline]
1005 fn block_thread(
1006 &mut self,
1007 reason: BlockReason,
1008 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
1009 callback: DynUnblockCallback<'tcx>,
1010 ) {
1011 let this = self.eval_context_mut();
1012 if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1013 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1014 }
1015 let timeout = timeout.map(|(clock, anchor, duration)| {
1016 let anchor = match clock {
1017 TimeoutClock::RealTime => {
1018 assert!(
1019 this.machine.communicate(),
1020 "cannot have `RealTime` timeout with isolation enabled!"
1021 );
1022 Timeout::RealTime(match anchor {
1023 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1024 TimeoutAnchor::Relative => SystemTime::now(),
1025 })
1026 }
1027 TimeoutClock::Monotonic =>
1028 Timeout::Monotonic(match anchor {
1029 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1030 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1031 }),
1032 };
1033 anchor.add_lossy(duration)
1034 });
1035 this.machine.threads.block_thread(reason, timeout, callback);
1036 }
1037
1038 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1041 let this = self.eval_context_mut();
1042 let old_state =
1043 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1044 let callback = match old_state {
1045 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1046 assert_eq!(
1047 reason, actual_reason,
1048 "unblock_thread: thread was blocked for the wrong reason"
1049 );
1050 callback
1051 }
1052 _ => panic!("unblock_thread: thread was not blocked"),
1053 };
1054 let old_thread = this.machine.threads.set_active_thread_id(thread);
1056 callback.call(this, UnblockKind::Ready)?;
1057 this.machine.threads.set_active_thread_id(old_thread);
1058 interp_ok(())
1059 }
1060
1061 #[inline]
1062 fn detach_thread(
1063 &mut self,
1064 thread_id: ThreadId,
1065 allow_terminated_joined: bool,
1066 ) -> InterpResult<'tcx> {
1067 let this = self.eval_context_mut();
1068 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1069 }
1070
1071 fn join_thread(
1075 &mut self,
1076 joined_thread_id: ThreadId,
1077 success_retval: Scalar,
1078 return_dest: &MPlaceTy<'tcx>,
1079 ) -> InterpResult<'tcx> {
1080 let this = self.eval_context_mut();
1081 let thread_mgr = &mut this.machine.threads;
1082 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1083 throw_ub_format!("trying to join a detached thread");
1085 }
1086
1087 fn after_join<'tcx>(
1088 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1089 joined_thread_id: ThreadId,
1090 success_retval: Scalar,
1091 return_dest: &MPlaceTy<'tcx>,
1092 ) -> InterpResult<'tcx> {
1093 let threads = &this.machine.threads;
1094 match &mut this.machine.data_race {
1095 GlobalDataRaceHandler::None => {}
1096 GlobalDataRaceHandler::Vclocks(data_race) =>
1097 data_race.thread_joined(threads, joined_thread_id),
1098 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1099 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1100 }
1101 this.write_scalar(success_retval, return_dest)?;
1102 interp_ok(())
1103 }
1104
1105 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1108 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1109 trace!(
1110 "{:?} blocked on {:?} when trying to join",
1111 thread_mgr.active_thread, joined_thread_id
1112 );
1113 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1114 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1115 }
1116
1117 let dest = return_dest.clone();
1120 thread_mgr.block_thread(
1121 BlockReason::Join(joined_thread_id),
1122 None,
1123 callback!(
1124 @capture<'tcx> {
1125 joined_thread_id: ThreadId,
1126 dest: MPlaceTy<'tcx>,
1127 success_retval: Scalar,
1128 }
1129 |this, unblock: UnblockKind| {
1130 assert_eq!(unblock, UnblockKind::Ready);
1131 after_join(this, joined_thread_id, success_retval, &dest)
1132 }
1133 ),
1134 );
1135 } else {
1136 after_join(this, joined_thread_id, success_retval, return_dest)?;
1138 }
1139 interp_ok(())
1140 }
1141
1142 fn join_thread_exclusive(
1147 &mut self,
1148 joined_thread_id: ThreadId,
1149 success_retval: Scalar,
1150 return_dest: &MPlaceTy<'tcx>,
1151 ) -> InterpResult<'tcx> {
1152 let this = self.eval_context_mut();
1153 let threads = &this.machine.threads.threads;
1154 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1155 throw_ub_format!("trying to join an already joined thread");
1156 }
1157
1158 if joined_thread_id == this.machine.threads.active_thread {
1159 throw_ub_format!("trying to join itself");
1160 }
1161
1162 assert!(
1164 threads
1165 .iter()
1166 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1167 "this thread already has threads waiting for its termination"
1168 );
1169
1170 this.join_thread(joined_thread_id, success_retval, return_dest)
1171 }
1172
1173 #[inline]
1174 fn active_thread(&self) -> ThreadId {
1175 let this = self.eval_context_ref();
1176 this.machine.threads.active_thread()
1177 }
1178
1179 #[inline]
1180 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1181 let this = self.eval_context_mut();
1182 this.machine.threads.active_thread_mut()
1183 }
1184
1185 #[inline]
1186 fn active_thread_ref(&self) -> &Thread<'tcx> {
1187 let this = self.eval_context_ref();
1188 this.machine.threads.active_thread_ref()
1189 }
1190
1191 #[inline]
1192 fn get_total_thread_count(&self) -> usize {
1193 let this = self.eval_context_ref();
1194 this.machine.threads.get_total_thread_count()
1195 }
1196
1197 #[inline]
1198 fn have_all_terminated(&self) -> bool {
1199 let this = self.eval_context_ref();
1200 this.machine.threads.have_all_terminated()
1201 }
1202
1203 #[inline]
1204 fn enable_thread(&mut self, thread_id: ThreadId) {
1205 let this = self.eval_context_mut();
1206 this.machine.threads.enable_thread(thread_id);
1207 }
1208
1209 #[inline]
1210 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1211 let this = self.eval_context_ref();
1212 this.machine.threads.active_thread_stack()
1213 }
1214
1215 #[inline]
1216 fn active_thread_stack_mut<'a>(
1217 &'a mut self,
1218 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1219 let this = self.eval_context_mut();
1220 this.machine.threads.active_thread_stack_mut()
1221 }
1222
1223 #[inline]
1225 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1226 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1227 }
1228
1229 #[inline]
1230 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1231 where
1232 'tcx: 'c,
1233 {
1234 self.eval_context_ref().machine.threads.get_thread_name(thread)
1235 }
1236
1237 #[inline]
1238 fn yield_active_thread(&mut self) {
1239 self.eval_context_mut().machine.threads.yield_active_thread();
1240 }
1241
1242 #[inline]
1243 fn maybe_preempt_active_thread(&mut self) {
1244 use rand::Rng as _;
1245
1246 let this = self.eval_context_mut();
1247 if !this.machine.threads.fixed_scheduling
1248 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1249 {
1250 this.yield_active_thread();
1251 }
1252 }
1253
1254 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1257 let this = self.eval_context_mut();
1258 loop {
1259 if CTRL_C_RECEIVED.load(Relaxed) {
1260 this.machine.handle_abnormal_termination();
1261 throw_machine_stop!(TerminationInfo::Interrupted);
1262 }
1263 match this.schedule()? {
1264 SchedulingAction::ExecuteStep => {
1265 if !this.step()? {
1266 match this.run_on_stack_empty()? {
1268 Poll::Pending => {} Poll::Ready(()) =>
1270 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1271 }
1272 }
1273 }
1274 SchedulingAction::ExecuteTimeoutCallback => {
1275 this.run_timeout_callback()?;
1276 }
1277 SchedulingAction::Sleep(duration) => {
1278 this.machine.monotonic_clock.sleep(duration);
1279 }
1280 }
1281 }
1282 }
1283}