1use std::sync::atomic::Ordering::Relaxed;
4use std::task::Poll;
5use std::time::{Duration, SystemTime};
6use std::{io, mem};
7
8use rand::RngExt;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::either::Either;
13use rustc_data_structures::fx::FxHashMap;
14use rustc_hir::def_id::DefId;
15use rustc_index::{Idx, IndexVec};
16use rustc_middle::mir::Mutability;
17use rustc_middle::ty::layout::TyAndLayout;
18use rustc_span::{DUMMY_SP, Span};
19use rustc_target::spec::Os;
20
21use crate::concurrency::GlobalDataRaceHandler;
22use crate::shims::{Epoll, EpollEvalContextExt, FileDescriptionRef, tls};
23use crate::*;
24
25#[derive(Clone, Copy, Debug, PartialEq)]
26enum SchedulingAction {
27 ExecuteStep,
29 SleepAndWaitForIo(Option<Duration>),
34}
35
36#[derive(Clone, Copy, Debug, PartialEq)]
38pub enum TlsAllocAction {
39 Deallocate,
41 Leak,
44}
45
46#[derive(Clone, Copy, Debug, PartialEq)]
48pub enum UnblockKind {
49 Ready,
51 TimedOut,
53}
54
55pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
58
59#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
61pub struct ThreadId(u32);
62
63impl ThreadId {
64 pub fn to_u32(self) -> u32 {
65 self.0
66 }
67
68 pub fn new_unchecked(id: u32) -> Self {
70 Self(id)
71 }
72
73 pub const MAIN_THREAD: ThreadId = ThreadId(0);
74}
75
76impl Idx for ThreadId {
77 fn new(idx: usize) -> Self {
78 ThreadId(u32::try_from(idx).unwrap())
79 }
80
81 fn index(self) -> usize {
82 usize::try_from(self.0).unwrap()
83 }
84}
85
86impl From<ThreadId> for u64 {
87 fn from(t: ThreadId) -> Self {
88 t.0.into()
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub enum BlockReason {
95 Join(ThreadId),
98 Sleep,
100 Mutex,
102 Condvar,
104 RwLock,
106 Futex,
108 InitOnce,
110 Epoll { epfd: FileDescriptionRef<Epoll> },
112 Eventfd,
114 VirtualSocket,
116 IO,
118 Genmc,
121}
122
123enum ThreadState<'tcx> {
125 Enabled,
127 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
129 Terminated,
132}
133
134impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Enabled => write!(f, "Enabled"),
138 Self::Blocked { reason, timeout, .. } =>
139 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
140 Self::Terminated => write!(f, "Terminated"),
141 }
142 }
143}
144
145impl<'tcx> ThreadState<'tcx> {
146 fn is_enabled(&self) -> bool {
147 matches!(self, ThreadState::Enabled)
148 }
149
150 fn is_terminated(&self) -> bool {
151 matches!(self, ThreadState::Terminated)
152 }
153
154 fn is_blocked_on(&self, reason: &BlockReason) -> bool {
155 matches!(self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
156 }
157}
158
159#[derive(Debug, Copy, Clone, PartialEq, Eq)]
161enum ThreadJoinStatus {
162 Joinable,
164 Detached,
167 Joined,
169}
170
171pub struct Thread<'tcx> {
173 state: ThreadState<'tcx>,
174
175 thread_name: Option<Vec<u8>>,
177
178 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
180
181 pub(crate) origin_span: Span,
184
185 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
190
191 top_user_relevant_frame: Option<usize>,
196
197 join_status: ThreadJoinStatus,
199
200 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
209
210 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
212}
213
214pub type StackEmptyCallback<'tcx> =
215 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
216
217impl<'tcx> Thread<'tcx> {
218 fn thread_name(&self) -> Option<&[u8]> {
220 self.thread_name.as_deref()
221 }
222
223 pub fn is_enabled(&self) -> bool {
225 self.state.is_enabled()
226 }
227
228 fn thread_display_name(&self, id: ThreadId) -> String {
230 if let Some(ref thread_name) = self.thread_name {
231 String::from_utf8_lossy(thread_name).into_owned()
232 } else {
233 format!("unnamed-{}", id.index())
234 }
235 }
236
237 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
243 let mut best = None;
245 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
246 let relevance = frame.extra.user_relevance;
247 if relevance == u8::MAX {
248 return Some(idx);
250 }
251 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
252 best = Some((idx, relevance));
255 }
256 }
257 best.map(|(idx, _relevance)| idx)
258 }
259
260 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
263 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
264 }
265
266 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
269 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
270 self.top_user_relevant_frame = Some(frame_idx);
271 }
272
273 pub fn top_user_relevant_frame(&self) -> Option<usize> {
276 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
280 }
281
282 pub fn current_user_relevance(&self) -> u8 {
283 self.top_user_relevant_frame()
284 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
285 .unwrap_or(0)
286 }
287
288 pub fn current_user_relevant_span(&self) -> Span {
289 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
290 self.top_user_relevant_frame()
291 .map(|frame_idx| self.stack[frame_idx].current_span())
292 .unwrap_or(rustc_span::DUMMY_SP)
293 }
294}
295
296impl<'tcx> std::fmt::Debug for Thread<'tcx> {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(
299 f,
300 "{}({:?}, {:?})",
301 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
302 self.state,
303 self.join_status
304 )
305 }
306}
307
308impl<'tcx> Thread<'tcx> {
309 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
310 Self {
311 state: ThreadState::Enabled,
312 thread_name: name.map(|name| Vec::from(name.as_bytes())),
313 stack: Vec::new(),
314 origin_span: DUMMY_SP,
315 top_user_relevant_frame: None,
316 join_status: ThreadJoinStatus::Joinable,
317 unwind_payloads: Vec::new(),
318 last_error: None,
319 on_stack_empty,
320 }
321 }
322}
323
324impl VisitProvenance for Thread<'_> {
325 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
326 let Thread {
327 unwind_payloads: panic_payload,
328 last_error,
329 stack,
330 origin_span: _,
331 top_user_relevant_frame: _,
332 state: _,
333 thread_name: _,
334 join_status: _,
335 on_stack_empty: _, } = self;
337
338 for payload in panic_payload {
339 payload.visit_provenance(visit);
340 }
341 last_error.visit_provenance(visit);
342 for frame in stack {
343 frame.visit_provenance(visit)
344 }
345 }
346}
347
348impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
349 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
350 let return_place = self.return_place();
351 let Frame {
352 locals,
353 extra,
354 ..
356 } = self;
357
358 return_place.visit_provenance(visit);
360 for local in locals.iter() {
362 match local.as_mplace_or_imm() {
363 None => {}
364 Some(Either::Left((ptr, meta))) => {
365 ptr.visit_provenance(visit);
366 meta.visit_provenance(visit);
367 }
368 Some(Either::Right(imm)) => {
369 imm.visit_provenance(visit);
370 }
371 }
372 }
373
374 extra.visit_provenance(visit);
375 }
376}
377
378#[derive(Debug)]
380enum Timeout {
381 Monotonic(Instant),
382 RealTime(SystemTime),
383}
384
385impl Timeout {
386 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
388 match self {
389 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
390 Timeout::RealTime(time) =>
391 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
392 }
393 }
394
395 fn add_lossy(&self, duration: Duration) -> Self {
397 match self {
398 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
399 Timeout::RealTime(s) => {
400 Timeout::RealTime(
402 s.checked_add(duration)
403 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
404 )
405 }
406 }
407 }
408}
409
410#[derive(Debug, Copy, Clone, PartialEq)]
412pub enum TimeoutClock {
413 Monotonic,
414 RealTime,
415}
416
417#[derive(Debug, Copy, Clone)]
419pub enum TimeoutAnchor {
420 Relative,
421 Absolute,
422}
423
424#[derive(Debug, Copy, Clone)]
426pub enum ThreadLookupError {
427 InvalidId,
429 Terminated(ThreadId),
431}
432
433#[derive(Debug)]
435pub struct ThreadManager<'tcx> {
436 active_thread: ThreadId,
438 threads: IndexVec<ThreadId, Thread<'tcx>>,
442 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
444 yield_active_thread: bool,
447 fixed_scheduling: bool,
449}
450
451impl VisitProvenance for ThreadManager<'_> {
452 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
453 let ThreadManager {
454 threads,
455 thread_local_allocs,
456 active_thread: _,
457 yield_active_thread: _,
458 fixed_scheduling: _,
459 } = self;
460
461 for thread in threads {
462 thread.visit_provenance(visit);
463 }
464 for ptr in thread_local_allocs.values() {
465 ptr.visit_provenance(visit);
466 }
467 }
468}
469
470impl<'tcx> ThreadManager<'tcx> {
471 pub(crate) fn new(config: &MiriConfig) -> Self {
472 let mut threads = IndexVec::new();
473 threads.push(Thread::new(Some("main"), None));
475 Self {
476 active_thread: ThreadId::MAIN_THREAD,
477 threads,
478 thread_local_allocs: Default::default(),
479 yield_active_thread: false,
480 fixed_scheduling: config.fixed_scheduling,
481 }
482 }
483
484 pub(crate) fn init(
485 ecx: &mut MiriInterpCx<'tcx>,
486 on_main_stack_empty: StackEmptyCallback<'tcx>,
487 ) {
488 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
489 Some(on_main_stack_empty);
490 if ecx.tcx.sess.target.os != Os::Windows {
491 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
493 ThreadJoinStatus::Detached;
494 }
495 }
496
497 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
501 if let Ok(id) = id.try_into()
502 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
503 {
504 let thread_id = ThreadId(id);
505 if self.threads[thread_id].state.is_terminated() {
506 Err(ThreadLookupError::Terminated(thread_id))
507 } else {
508 Ok(thread_id)
509 }
510 } else {
511 Err(ThreadLookupError::InvalidId)
512 }
513 }
514
515 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
518 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
519 }
520
521 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
526 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
527 }
528
529 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
531 &self.threads[self.active_thread].stack
532 }
533
534 pub fn active_thread_stack_mut(
536 &mut self,
537 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
538 &mut self.threads[self.active_thread].stack
539 }
540
541 pub fn all_blocked_stacks(
542 &self,
543 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
544 self.threads
545 .iter_enumerated()
546 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
547 .map(|(id, t)| (id, &t.stack[..]))
548 }
549
550 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
552 let new_thread_id = ThreadId::new(self.threads.len());
553 self.threads.push(Thread::new(None, Some(on_stack_empty)));
554 new_thread_id
555 }
556
557 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
559 assert!(id.index() < self.threads.len());
560 info!(
561 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
562 self.get_thread_display_name(id),
563 self.get_thread_display_name(self.active_thread)
564 );
565 std::mem::replace(&mut self.active_thread, id)
566 }
567
568 pub fn active_thread(&self) -> ThreadId {
570 self.active_thread
571 }
572
573 pub fn get_total_thread_count(&self) -> usize {
575 self.threads.len()
576 }
577
578 pub fn get_live_thread_count(&self) -> usize {
581 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
582 }
583
584 fn has_terminated(&self, thread_id: ThreadId) -> bool {
586 self.threads[thread_id].state.is_terminated()
587 }
588
589 fn have_all_terminated(&self) -> bool {
591 self.threads.iter().all(|thread| thread.state.is_terminated())
592 }
593
594 fn enable_thread(&mut self, thread_id: ThreadId) {
596 assert!(self.has_terminated(thread_id));
597 self.threads[thread_id].state = ThreadState::Enabled;
598 }
599
600 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
602 &mut self.threads[self.active_thread]
603 }
604
605 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
607 &self.threads[self.active_thread]
608 }
609
610 pub fn thread_ref(&self, thread_id: ThreadId) -> &Thread<'tcx> {
611 &self.threads[thread_id]
612 }
613
614 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
623 trace!("detaching {:?}", id);
625
626 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
627 self.threads[id].join_status == ThreadJoinStatus::Detached
629 } else {
630 self.threads[id].join_status != ThreadJoinStatus::Joinable
631 };
632 if is_ub {
633 throw_ub_format!("trying to detach thread that was already detached or joined");
634 }
635
636 self.threads[id].join_status = ThreadJoinStatus::Detached;
637 interp_ok(())
638 }
639
640 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
642 self.threads[thread].thread_name = Some(new_thread_name);
643 }
644
645 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
647 self.threads[thread].thread_name()
648 }
649
650 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
651 self.threads[thread].thread_display_name(thread)
652 }
653
654 fn block_thread(
656 &mut self,
657 reason: BlockReason,
658 timeout: Option<Timeout>,
659 callback: DynUnblockCallback<'tcx>,
660 ) {
661 let state = &mut self.threads[self.active_thread].state;
662 assert!(state.is_enabled());
663 *state = ThreadState::Blocked { reason, timeout, callback }
664 }
665
666 fn yield_active_thread(&mut self) {
668 self.yield_active_thread = true;
672 }
673}
674
675impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
676trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
677 #[inline]
678 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
679 let this = self.eval_context_mut();
680 let active_thread = this.active_thread_mut();
681 active_thread.origin_span = DUMMY_SP; let mut callback = active_thread
683 .on_stack_empty
684 .take()
685 .expect("`on_stack_empty` not set up, or already running");
686 let res = callback(this)?;
687 this.active_thread_mut().on_stack_empty = Some(callback);
688 interp_ok(res)
689 }
690
691 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
700 let this = self.eval_context_mut();
701
702 if this.machine.data_race.as_genmc_ref().is_some() {
704 loop {
705 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
706 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
707 return interp_ok(SchedulingAction::ExecuteStep);
708 };
709 if this.machine.threads.threads[next_thread_id]
711 .state
712 .is_blocked_on(&BlockReason::Genmc)
713 {
714 info!(
715 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
716 );
717 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
718 }
719 let thread_manager = &mut this.machine.threads;
722 if thread_manager.threads[next_thread_id].state.is_enabled() {
723 thread_manager.active_thread = next_thread_id;
725 return interp_ok(SchedulingAction::ExecuteStep);
726 }
727 }
728 }
729
730 let thread_manager = &this.machine.threads;
732 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
734 && !thread_manager.yield_active_thread
735 {
736 return interp_ok(SchedulingAction::ExecuteStep);
738 }
739
740 if this.machine.communicate() {
744 this.poll_and_unblock(Some(Duration::ZERO))?;
751 }
752
753 let potential_sleep_time = this.unblock_expired_timeouts()?;
759
760 let thread_manager = &mut this.machine.threads;
761 let rng = this.machine.rng.get_mut();
762
763 let mut threads_iter = thread_manager
770 .threads
771 .iter_enumerated()
772 .skip(thread_manager.active_thread.index() + 1)
773 .chain(
774 thread_manager
775 .threads
776 .iter_enumerated()
777 .take(thread_manager.active_thread.index() + 1),
778 )
779 .filter(|(_id, thread)| thread.state.is_enabled());
780 let new_thread = if thread_manager.fixed_scheduling {
782 let next = threads_iter.next();
783 drop(threads_iter);
784 next
785 } else {
786 threads_iter.choose(rng)
787 };
788
789 if let Some((id, _thread)) = new_thread {
790 if thread_manager.active_thread != id {
791 info!(
792 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
793 thread_manager.get_thread_display_name(id),
794 thread_manager.get_thread_display_name(thread_manager.active_thread)
795 );
796 thread_manager.active_thread = id;
797 }
798 }
799 thread_manager.yield_active_thread = false;
801
802 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
803 return interp_ok(SchedulingAction::ExecuteStep);
804 }
805
806 let threads = &this.machine.threads.threads;
808
809 if threads.iter().all(|thread| thread.state.is_terminated()) {
810 unreachable!("all threads terminated without the main thread terminating?!");
811 } else if let Some(sleep_time) = potential_sleep_time {
812 interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
816 } else if threads.iter().any(|thread| this.is_thread_blocked_on_host(thread)) {
817 interp_ok(SchedulingAction::SleepAndWaitForIo(None))
821 } else {
822 throw_machine_stop!(TerminationInfo::GlobalDeadlock);
823 }
824 }
825
826 fn is_thread_blocked_on_host(&self, thread: &Thread<'tcx>) -> bool {
830 let this = self.eval_context_ref();
831 match &thread.state {
832 ThreadState::Blocked { reason: BlockReason::IO, .. } => true,
833 ThreadState::Blocked { reason: BlockReason::Epoll { epfd }, .. } =>
834 this.has_epoll_host_interests(epfd),
835 _ => false,
836 }
837 }
838
839 fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
845 let this = self.eval_context_mut();
846
847 match BlockingIoManager::poll(this, timeout)? {
848 Ok(_) => interp_ok(()),
849 Err(e) if e.kind() == io::ErrorKind::Interrupted => interp_ok(()),
851 Err(e) => panic!("unexpected error while polling: {e}"),
854 }
855 }
856
857 fn unblock_expired_timeouts(&mut self) -> InterpResult<'tcx, Option<Duration>> {
862 let this = self.eval_context_mut();
863 let clock = &this.machine.monotonic_clock;
864
865 let mut min_wait_time = Option::<Duration>::None;
866 let mut callbacks = Vec::new();
867
868 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
869 match &thread.state {
870 ThreadState::Blocked { timeout: Some(timeout), .. } => {
871 let wait_time = timeout.get_wait_time(clock);
872 if wait_time.is_zero() {
873 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
875 let ThreadState::Blocked { callback, .. } = old_state else {
876 unreachable!()
877 };
878 callbacks.push((id, callback));
880 } else {
881 min_wait_time = Some(wait_time.min(min_wait_time.unwrap_or(Duration::MAX)));
884 }
885 }
886 _ => {}
887 }
888 }
889
890 for (thread, callback) in callbacks {
891 let old_thread = this.machine.threads.set_active_thread_id(thread);
898 callback.call(this, UnblockKind::TimedOut)?;
899 this.machine.threads.set_active_thread_id(old_thread);
900 }
901
902 interp_ok(min_wait_time)
903 }
904}
905
906impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
908pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
909 #[inline]
910 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
911 self.eval_context_ref().machine.threads.thread_id_try_from(id)
912 }
913
914 fn get_or_create_thread_local_alloc(
917 &mut self,
918 def_id: DefId,
919 ) -> InterpResult<'tcx, StrictPointer> {
920 let this = self.eval_context_mut();
921 let tcx = this.tcx;
922 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
923 interp_ok(old_alloc)
926 } else {
927 if tcx.is_foreign_item(def_id) {
931 throw_unsup_format!("foreign thread-local statics are not supported");
932 }
933 let params = this.machine.get_default_alloc_params();
934 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
935 let mut alloc = alloc.inner().adjust_from_tcx(
937 &this.tcx,
938 |bytes, align| {
939 interp_ok(MiriAllocBytes::from_bytes(
940 std::borrow::Cow::Borrowed(bytes),
941 align,
942 params,
943 ))
944 },
945 |ptr| this.global_root_pointer(ptr),
946 )?;
947 alloc.mutability = Mutability::Mut;
949 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
951 this.machine.threads.set_thread_local_alloc(def_id, ptr);
952 interp_ok(ptr)
953 }
954 }
955
956 #[inline]
958 fn start_regular_thread(
959 &mut self,
960 thread: Option<MPlaceTy<'tcx>>,
961 start_routine: Pointer,
962 start_abi: ExternAbi,
963 func_arg: ImmTy<'tcx>,
964 ret_layout: TyAndLayout<'tcx>,
965 ) -> InterpResult<'tcx, ThreadId> {
966 let this = self.eval_context_mut();
967
968 let current_span = this.machine.current_user_relevant_span();
970 let new_thread_id = this.machine.threads.create_thread({
971 let mut state = tls::TlsDtorsState::default();
972 Box::new(move |m| state.on_stack_empty(m))
973 });
974 match &mut this.machine.data_race {
975 GlobalDataRaceHandler::None => {}
976 GlobalDataRaceHandler::Vclocks(data_race) =>
977 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
978 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
979 genmc_ctx.handle_thread_create(
980 &this.machine.threads,
981 start_routine,
982 &func_arg,
983 new_thread_id,
984 )?,
985 }
986 if let Some(thread_info_place) = thread {
989 this.write_scalar(
990 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
991 &thread_info_place,
992 )?;
993 }
994
995 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
998
999 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
1001 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
1002 }
1003
1004 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
1006
1007 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
1011
1012 this.call_thread_root_function(
1013 instance,
1014 start_abi,
1015 &[func_arg],
1016 Some(&ret_place),
1017 current_span,
1018 )?;
1019
1020 this.machine.threads.set_active_thread_id(old_thread_id);
1022
1023 interp_ok(new_thread_id)
1024 }
1025
1026 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
1031 let this = self.eval_context_mut();
1032
1033 let thread = this.active_thread_mut();
1035 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
1036 thread.state = ThreadState::Terminated;
1037
1038 let gone_thread = this.active_thread();
1040 {
1041 let mut free_tls_statics = Vec::new();
1042 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
1043 if thread != gone_thread {
1044 return true;
1046 }
1047 free_tls_statics.push(alloc_id);
1050 false
1051 });
1052 for ptr in free_tls_statics {
1054 match tls_alloc_action {
1055 TlsAllocAction::Deallocate =>
1056 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
1057 TlsAllocAction::Leak =>
1058 if let Some(alloc) = ptr.provenance.get_alloc_id() {
1059 trace!(
1060 "Thread-local static leaked and stored as static root: {:?}",
1061 alloc
1062 );
1063 this.machine.static_roots.push(alloc);
1064 },
1065 }
1066 }
1067 }
1068
1069 match &mut this.machine.data_race {
1070 GlobalDataRaceHandler::None => {}
1071 GlobalDataRaceHandler::Vclocks(data_race) =>
1072 data_race.thread_terminated(&this.machine.threads),
1073 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
1074 genmc_ctx.handle_thread_finish(&this.machine.threads)
1077 }
1078 }
1079
1080 let unblock_reason = BlockReason::Join(gone_thread);
1082 let threads = &this.machine.threads.threads;
1083 let joining_threads = threads
1084 .iter_enumerated()
1085 .filter(|(_, thread)| thread.state.is_blocked_on(&unblock_reason))
1086 .map(|(id, _)| id)
1087 .collect::<Vec<_>>();
1088 for thread in joining_threads {
1089 this.unblock_thread(thread, unblock_reason.clone())?;
1090 }
1091
1092 interp_ok(())
1093 }
1094
1095 #[inline]
1098 fn block_thread(
1099 &mut self,
1100 reason: BlockReason,
1101 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
1102 callback: DynUnblockCallback<'tcx>,
1103 ) {
1104 let this = self.eval_context_mut();
1105 if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1106 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1107 }
1108 let timeout = timeout.map(|(clock, anchor, duration)| {
1109 let anchor = match clock {
1110 TimeoutClock::RealTime => {
1111 assert!(
1112 this.machine.communicate(),
1113 "cannot have `RealTime` timeout with isolation enabled!"
1114 );
1115 Timeout::RealTime(match anchor {
1116 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1117 TimeoutAnchor::Relative => SystemTime::now(),
1118 })
1119 }
1120 TimeoutClock::Monotonic =>
1121 Timeout::Monotonic(match anchor {
1122 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1123 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1124 }),
1125 };
1126 anchor.add_lossy(duration)
1127 });
1128 this.machine.threads.block_thread(reason, timeout, callback);
1129 }
1130
1131 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1134 let this = self.eval_context_mut();
1135 let old_state =
1136 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1137 let callback = match old_state {
1138 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1139 assert_eq!(
1140 reason, actual_reason,
1141 "unblock_thread: thread was blocked for the wrong reason"
1142 );
1143 callback
1144 }
1145 _ => panic!("unblock_thread: thread was not blocked"),
1146 };
1147 let old_thread = this.machine.threads.set_active_thread_id(thread);
1149 callback.call(this, UnblockKind::Ready)?;
1150 this.machine.threads.set_active_thread_id(old_thread);
1151 interp_ok(())
1152 }
1153
1154 #[inline]
1155 fn detach_thread(
1156 &mut self,
1157 thread_id: ThreadId,
1158 allow_terminated_joined: bool,
1159 ) -> InterpResult<'tcx> {
1160 let this = self.eval_context_mut();
1161 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1162 }
1163
1164 fn join_thread(
1168 &mut self,
1169 joined_thread_id: ThreadId,
1170 success_retval: Scalar,
1171 return_dest: &MPlaceTy<'tcx>,
1172 ) -> InterpResult<'tcx> {
1173 let this = self.eval_context_mut();
1174 let thread_mgr = &mut this.machine.threads;
1175 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1176 throw_ub_format!("trying to join a detached thread");
1178 }
1179
1180 fn after_join<'tcx>(
1181 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1182 joined_thread_id: ThreadId,
1183 success_retval: Scalar,
1184 return_dest: &MPlaceTy<'tcx>,
1185 ) -> InterpResult<'tcx> {
1186 let threads = &this.machine.threads;
1187 match &mut this.machine.data_race {
1188 GlobalDataRaceHandler::None => {}
1189 GlobalDataRaceHandler::Vclocks(data_race) =>
1190 data_race.thread_joined(threads, joined_thread_id),
1191 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1192 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1193 }
1194 this.write_scalar(success_retval, return_dest)?;
1195 interp_ok(())
1196 }
1197
1198 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1201 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1202 trace!(
1203 "{:?} blocked on {:?} when trying to join",
1204 thread_mgr.active_thread, joined_thread_id
1205 );
1206 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1207 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1208 }
1209
1210 let dest = return_dest.clone();
1213 thread_mgr.block_thread(
1214 BlockReason::Join(joined_thread_id),
1215 None,
1216 callback!(
1217 @capture<'tcx> {
1218 joined_thread_id: ThreadId,
1219 dest: MPlaceTy<'tcx>,
1220 success_retval: Scalar,
1221 }
1222 |this, unblock: UnblockKind| {
1223 assert_eq!(unblock, UnblockKind::Ready);
1224 after_join(this, joined_thread_id, success_retval, &dest)
1225 }
1226 ),
1227 );
1228 } else {
1229 after_join(this, joined_thread_id, success_retval, return_dest)?;
1231 }
1232 interp_ok(())
1233 }
1234
1235 fn join_thread_exclusive(
1240 &mut self,
1241 joined_thread_id: ThreadId,
1242 success_retval: Scalar,
1243 return_dest: &MPlaceTy<'tcx>,
1244 ) -> InterpResult<'tcx> {
1245 let this = self.eval_context_mut();
1246 let threads = &this.machine.threads.threads;
1247 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1248 throw_ub_format!("trying to join an already joined thread");
1249 }
1250
1251 if joined_thread_id == this.machine.threads.active_thread {
1252 throw_ub_format!("trying to join itself");
1253 }
1254
1255 assert!(
1257 threads.iter().all(|thread| {
1258 !thread.state.is_blocked_on(&BlockReason::Join(joined_thread_id))
1259 }),
1260 "this thread already has threads waiting for its termination"
1261 );
1262
1263 this.join_thread(joined_thread_id, success_retval, return_dest)
1264 }
1265
1266 #[inline]
1267 fn active_thread(&self) -> ThreadId {
1268 let this = self.eval_context_ref();
1269 this.machine.threads.active_thread()
1270 }
1271
1272 #[inline]
1273 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1274 let this = self.eval_context_mut();
1275 this.machine.threads.active_thread_mut()
1276 }
1277
1278 #[inline]
1279 fn active_thread_ref(&self) -> &Thread<'tcx> {
1280 let this = self.eval_context_ref();
1281 this.machine.threads.active_thread_ref()
1282 }
1283
1284 #[inline]
1285 fn get_total_thread_count(&self) -> usize {
1286 let this = self.eval_context_ref();
1287 this.machine.threads.get_total_thread_count()
1288 }
1289
1290 #[inline]
1291 fn have_all_terminated(&self) -> bool {
1292 let this = self.eval_context_ref();
1293 this.machine.threads.have_all_terminated()
1294 }
1295
1296 #[inline]
1297 fn enable_thread(&mut self, thread_id: ThreadId) {
1298 let this = self.eval_context_mut();
1299 this.machine.threads.enable_thread(thread_id);
1300 }
1301
1302 #[inline]
1303 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1304 let this = self.eval_context_ref();
1305 this.machine.threads.active_thread_stack()
1306 }
1307
1308 #[inline]
1309 fn active_thread_stack_mut<'a>(
1310 &'a mut self,
1311 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1312 let this = self.eval_context_mut();
1313 this.machine.threads.active_thread_stack_mut()
1314 }
1315
1316 #[inline]
1318 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1319 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1320 }
1321
1322 #[inline]
1323 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1324 where
1325 'tcx: 'c,
1326 {
1327 self.eval_context_ref().machine.threads.get_thread_name(thread)
1328 }
1329
1330 #[inline]
1331 fn yield_active_thread(&mut self) {
1332 self.eval_context_mut().machine.threads.yield_active_thread();
1333 }
1334
1335 #[inline]
1336 fn maybe_preempt_active_thread(&mut self) {
1337 let this = self.eval_context_mut();
1338 if !this.machine.threads.fixed_scheduling
1339 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1340 {
1341 this.yield_active_thread();
1342 }
1343 }
1344
1345 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1348 let this = self.eval_context_mut();
1349 loop {
1350 if CTRL_C_RECEIVED.load(Relaxed) {
1351 this.machine.handle_abnormal_termination();
1352 throw_machine_stop!(TerminationInfo::Interrupted);
1353 }
1354 match this.schedule()? {
1355 SchedulingAction::ExecuteStep => {
1356 if !this.step()? {
1357 match this.run_on_stack_empty()? {
1359 Poll::Pending => {} Poll::Ready(()) =>
1361 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1362 }
1363 }
1364 }
1365 SchedulingAction::SleepAndWaitForIo(duration) => {
1366 if this.machine.communicate() {
1367 this.poll_and_unblock(duration)?;
1372 } else {
1373 let duration = duration.expect(
1374 "Infinite sleep should not be triggered when isolation is enabled",
1375 );
1376 this.machine.monotonic_clock.sleep(duration);
1377 }
1378 }
1379 }
1380 }
1381 }
1382}