1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rustc_abi::ExternAbi;
10use rustc_const_eval::CTRL_C_RECEIVED;
11use rustc_data_structures::fx::FxHashMap;
12use rustc_hir::def_id::DefId;
13use rustc_index::{Idx, IndexVec};
14use rustc_middle::mir::Mutability;
15use rustc_middle::ty::layout::TyAndLayout;
16use rustc_span::Span;
17
18use crate::concurrency::data_race;
19use crate::shims::tls;
20use crate::*;
21
22#[derive(Clone, Copy, Debug, PartialEq)]
23enum SchedulingAction {
24 ExecuteStep,
26 ExecuteTimeoutCallback,
28 Sleep(Duration),
30}
31
32#[derive(Clone, Copy, Debug, PartialEq)]
34pub enum TlsAllocAction {
35 Deallocate,
37 Leak,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq)]
44pub enum UnblockKind {
45 Ready,
47 TimedOut,
49}
50
51pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
54
55#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
57pub struct ThreadId(u32);
58
59impl ThreadId {
60 pub fn to_u32(self) -> u32 {
61 self.0
62 }
63
64 pub fn new_unchecked(id: u32) -> Self {
66 Self(id)
67 }
68
69 pub const MAIN_THREAD: ThreadId = ThreadId(0);
70}
71
72impl Idx for ThreadId {
73 fn new(idx: usize) -> Self {
74 ThreadId(u32::try_from(idx).unwrap())
75 }
76
77 fn index(self) -> usize {
78 usize::try_from(self.0).unwrap()
79 }
80}
81
82impl From<ThreadId> for u64 {
83 fn from(t: ThreadId) -> Self {
84 t.0.into()
85 }
86}
87
88#[derive(Debug, Copy, Clone, PartialEq, Eq)]
90pub enum BlockReason {
91 Join(ThreadId),
94 Sleep,
96 Mutex,
98 Condvar(CondvarId),
100 RwLock(RwLockId),
102 Futex,
104 InitOnce(InitOnceId),
106 Epoll,
108 Eventfd,
110 UnnamedSocket,
112}
113
114enum ThreadState<'tcx> {
116 Enabled,
118 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
120 Terminated,
123}
124
125impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 Self::Enabled => write!(f, "Enabled"),
129 Self::Blocked { reason, timeout, .. } =>
130 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
131 Self::Terminated => write!(f, "Terminated"),
132 }
133 }
134}
135
136impl<'tcx> ThreadState<'tcx> {
137 fn is_enabled(&self) -> bool {
138 matches!(self, ThreadState::Enabled)
139 }
140
141 fn is_terminated(&self) -> bool {
142 matches!(self, ThreadState::Terminated)
143 }
144
145 fn is_blocked_on(&self, reason: BlockReason) -> bool {
146 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
147 }
148}
149
150#[derive(Debug, Copy, Clone, PartialEq, Eq)]
152enum ThreadJoinStatus {
153 Joinable,
155 Detached,
158 Joined,
160}
161
162pub struct Thread<'tcx> {
164 state: ThreadState<'tcx>,
165
166 thread_name: Option<Vec<u8>>,
168
169 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
171
172 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
177
178 top_user_relevant_frame: Option<usize>,
184
185 join_status: ThreadJoinStatus,
187
188 pub(crate) panic_payloads: Vec<ImmTy<'tcx>>,
197
198 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
200}
201
202pub type StackEmptyCallback<'tcx> =
203 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
204
205impl<'tcx> Thread<'tcx> {
206 fn thread_name(&self) -> Option<&[u8]> {
208 self.thread_name.as_deref()
209 }
210
211 fn thread_display_name(&self, id: ThreadId) -> String {
213 if let Some(ref thread_name) = self.thread_name {
214 String::from_utf8_lossy(thread_name).into_owned()
215 } else {
216 format!("unnamed-{}", id.index())
217 }
218 }
219
220 fn compute_top_user_relevant_frame(&self) -> Option<usize> {
225 self.stack
226 .iter()
227 .enumerate()
228 .rev()
229 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
230 }
231
232 pub fn recompute_top_user_relevant_frame(&mut self) {
234 self.top_user_relevant_frame = self.compute_top_user_relevant_frame();
235 }
236
237 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
240 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame());
241 self.top_user_relevant_frame = Some(frame_idx);
242 }
243
244 pub fn top_user_relevant_frame(&self) -> Option<usize> {
247 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame());
248 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
252 }
253
254 pub fn current_span(&self) -> Span {
255 self.top_user_relevant_frame()
256 .map(|frame_idx| self.stack[frame_idx].current_span())
257 .unwrap_or(rustc_span::DUMMY_SP)
258 }
259}
260
261impl<'tcx> std::fmt::Debug for Thread<'tcx> {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 write!(
264 f,
265 "{}({:?}, {:?})",
266 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
267 self.state,
268 self.join_status
269 )
270 }
271}
272
273impl<'tcx> Thread<'tcx> {
274 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
275 Self {
276 state: ThreadState::Enabled,
277 thread_name: name.map(|name| Vec::from(name.as_bytes())),
278 stack: Vec::new(),
279 top_user_relevant_frame: None,
280 join_status: ThreadJoinStatus::Joinable,
281 panic_payloads: Vec::new(),
282 last_error: None,
283 on_stack_empty,
284 }
285 }
286}
287
288impl VisitProvenance for Thread<'_> {
289 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
290 let Thread {
291 panic_payloads: panic_payload,
292 last_error,
293 stack,
294 top_user_relevant_frame: _,
295 state: _,
296 thread_name: _,
297 join_status: _,
298 on_stack_empty: _, } = self;
300
301 for payload in panic_payload {
302 payload.visit_provenance(visit);
303 }
304 last_error.visit_provenance(visit);
305 for frame in stack {
306 frame.visit_provenance(visit)
307 }
308 }
309}
310
311impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
312 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
313 let Frame {
314 return_place,
315 locals,
316 extra,
317 ..
319 } = self;
320
321 return_place.visit_provenance(visit);
323 for local in locals.iter() {
325 match local.as_mplace_or_imm() {
326 None => {}
327 Some(Either::Left((ptr, meta))) => {
328 ptr.visit_provenance(visit);
329 meta.visit_provenance(visit);
330 }
331 Some(Either::Right(imm)) => {
332 imm.visit_provenance(visit);
333 }
334 }
335 }
336
337 extra.visit_provenance(visit);
338 }
339}
340
341#[derive(Debug)]
343enum Timeout {
344 Monotonic(Instant),
345 RealTime(SystemTime),
346}
347
348impl Timeout {
349 fn get_wait_time(&self, clock: &Clock) -> Duration {
351 match self {
352 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
353 Timeout::RealTime(time) =>
354 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
355 }
356 }
357
358 fn add_lossy(&self, duration: Duration) -> Self {
360 match self {
361 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
362 Timeout::RealTime(s) => {
363 Timeout::RealTime(
365 s.checked_add(duration)
366 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
367 )
368 }
369 }
370 }
371}
372
373#[derive(Debug, Copy, Clone)]
375pub enum TimeoutClock {
376 Monotonic,
377 RealTime,
378}
379
380#[derive(Debug, Copy, Clone)]
382pub enum TimeoutAnchor {
383 Relative,
384 Absolute,
385}
386
387#[derive(Debug, Copy, Clone)]
389pub struct ThreadNotFound;
390
391#[derive(Debug)]
393pub struct ThreadManager<'tcx> {
394 active_thread: ThreadId,
396 threads: IndexVec<ThreadId, Thread<'tcx>>,
400 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
402 yield_active_thread: bool,
404}
405
406impl VisitProvenance for ThreadManager<'_> {
407 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
408 let ThreadManager {
409 threads,
410 thread_local_allocs,
411 active_thread: _,
412 yield_active_thread: _,
413 } = self;
414
415 for thread in threads {
416 thread.visit_provenance(visit);
417 }
418 for ptr in thread_local_allocs.values() {
419 ptr.visit_provenance(visit);
420 }
421 }
422}
423
424impl<'tcx> Default for ThreadManager<'tcx> {
425 fn default() -> Self {
426 let mut threads = IndexVec::new();
427 threads.push(Thread::new(Some("main"), None));
429 Self {
430 active_thread: ThreadId::MAIN_THREAD,
431 threads,
432 thread_local_allocs: Default::default(),
433 yield_active_thread: false,
434 }
435 }
436}
437
438impl<'tcx> ThreadManager<'tcx> {
439 pub(crate) fn init(
440 ecx: &mut MiriInterpCx<'tcx>,
441 on_main_stack_empty: StackEmptyCallback<'tcx>,
442 ) {
443 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
444 Some(on_main_stack_empty);
445 if ecx.tcx.sess.target.os.as_ref() != "windows" {
446 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
448 ThreadJoinStatus::Detached;
449 }
450 }
451
452 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
453 if let Ok(id) = id.try_into()
454 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
455 {
456 Ok(ThreadId(id))
457 } else {
458 Err(ThreadNotFound)
459 }
460 }
461
462 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
465 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
466 }
467
468 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
473 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
474 }
475
476 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
478 &self.threads[self.active_thread].stack
479 }
480
481 pub fn active_thread_stack_mut(
483 &mut self,
484 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
485 &mut self.threads[self.active_thread].stack
486 }
487
488 pub fn all_stacks(
489 &self,
490 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
491 self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
492 }
493
494 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
496 let new_thread_id = ThreadId::new(self.threads.len());
497 self.threads.push(Thread::new(None, Some(on_stack_empty)));
498 new_thread_id
499 }
500
501 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
503 assert!(id.index() < self.threads.len());
504 info!(
505 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
506 self.get_thread_display_name(id),
507 self.get_thread_display_name(self.active_thread)
508 );
509 std::mem::replace(&mut self.active_thread, id)
510 }
511
512 pub fn active_thread(&self) -> ThreadId {
514 self.active_thread
515 }
516
517 pub fn get_total_thread_count(&self) -> usize {
519 self.threads.len()
520 }
521
522 pub fn get_live_thread_count(&self) -> usize {
525 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
526 }
527
528 fn has_terminated(&self, thread_id: ThreadId) -> bool {
530 self.threads[thread_id].state.is_terminated()
531 }
532
533 fn have_all_terminated(&self) -> bool {
535 self.threads.iter().all(|thread| thread.state.is_terminated())
536 }
537
538 fn enable_thread(&mut self, thread_id: ThreadId) {
540 assert!(self.has_terminated(thread_id));
541 self.threads[thread_id].state = ThreadState::Enabled;
542 }
543
544 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
546 &mut self.threads[self.active_thread]
547 }
548
549 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
551 &self.threads[self.active_thread]
552 }
553
554 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
563 trace!("detaching {:?}", id);
564
565 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
566 self.threads[id].join_status == ThreadJoinStatus::Detached
568 } else {
569 self.threads[id].join_status != ThreadJoinStatus::Joinable
570 };
571 if is_ub {
572 throw_ub_format!("trying to detach thread that was already detached or joined");
573 }
574
575 self.threads[id].join_status = ThreadJoinStatus::Detached;
576 interp_ok(())
577 }
578
579 fn join_thread(
581 &mut self,
582 joined_thread_id: ThreadId,
583 data_race: Option<&mut data_race::GlobalState>,
584 ) -> InterpResult<'tcx> {
585 if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
586 throw_ub_format!("trying to join a detached thread");
588 }
589
590 self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
593 if !self.threads[joined_thread_id].state.is_terminated() {
594 trace!(
595 "{:?} blocked on {:?} when trying to join",
596 self.active_thread, joined_thread_id
597 );
598 self.block_thread(
601 BlockReason::Join(joined_thread_id),
602 None,
603 callback!(
604 @capture<'tcx> {
605 joined_thread_id: ThreadId,
606 }
607 |this, unblock: UnblockKind| {
608 assert_eq!(unblock, UnblockKind::Ready);
609 if let Some(data_race) = &mut this.machine.data_race {
610 data_race.thread_joined(&this.machine.threads, joined_thread_id);
611 }
612 interp_ok(())
613 }
614 ),
615 );
616 } else {
617 if let Some(data_race) = data_race {
619 data_race.thread_joined(self, joined_thread_id);
620 }
621 }
622 interp_ok(())
623 }
624
625 fn join_thread_exclusive(
628 &mut self,
629 joined_thread_id: ThreadId,
630 data_race: Option<&mut data_race::GlobalState>,
631 ) -> InterpResult<'tcx> {
632 if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
633 throw_ub_format!("trying to join an already joined thread");
634 }
635
636 if joined_thread_id == self.active_thread {
637 throw_ub_format!("trying to join itself");
638 }
639
640 assert!(
642 self.threads
643 .iter()
644 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
645 "this thread already has threads waiting for its termination"
646 );
647
648 self.join_thread(joined_thread_id, data_race)
649 }
650
651 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
653 self.threads[thread].thread_name = Some(new_thread_name);
654 }
655
656 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
658 self.threads[thread].thread_name()
659 }
660
661 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
662 self.threads[thread].thread_display_name(thread)
663 }
664
665 fn block_thread(
667 &mut self,
668 reason: BlockReason,
669 timeout: Option<Timeout>,
670 callback: DynUnblockCallback<'tcx>,
671 ) {
672 let state = &mut self.threads[self.active_thread].state;
673 assert!(state.is_enabled());
674 *state = ThreadState::Blocked { reason, timeout, callback }
675 }
676
677 fn yield_active_thread(&mut self) {
679 self.yield_active_thread = true;
683 }
684
685 fn next_callback_wait_time(&self, clock: &Clock) -> Option<Duration> {
687 self.threads
688 .iter()
689 .filter_map(|t| {
690 match &t.state {
691 ThreadState::Blocked { timeout: Some(timeout), .. } =>
692 Some(timeout.get_wait_time(clock)),
693 _ => None,
694 }
695 })
696 .min()
697 }
698
699 fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
706 if self.threads[self.active_thread].state.is_enabled() && !self.yield_active_thread {
708 return interp_ok(SchedulingAction::ExecuteStep);
710 }
711 let potential_sleep_time = self.next_callback_wait_time(clock);
718 if potential_sleep_time == Some(Duration::ZERO) {
719 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
720 }
721 let threads = self
730 .threads
731 .iter_enumerated()
732 .skip(self.active_thread.index() + 1)
733 .chain(self.threads.iter_enumerated().take(self.active_thread.index()));
734 for (id, thread) in threads {
735 debug_assert_ne!(self.active_thread, id);
736 if thread.state.is_enabled() {
737 info!(
738 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
739 self.get_thread_display_name(id),
740 self.get_thread_display_name(self.active_thread)
741 );
742 self.active_thread = id;
743 break;
744 }
745 }
746 self.yield_active_thread = false;
747 if self.threads[self.active_thread].state.is_enabled() {
748 return interp_ok(SchedulingAction::ExecuteStep);
749 }
750 if self.threads.iter().all(|thread| thread.state.is_terminated()) {
752 unreachable!("all threads terminated without the main thread terminating?!");
753 } else if let Some(sleep_time) = potential_sleep_time {
754 interp_ok(SchedulingAction::Sleep(sleep_time))
758 } else {
759 throw_machine_stop!(TerminationInfo::Deadlock);
760 }
761 }
762}
763
764impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
765trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
766 #[inline]
768 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
769 let this = self.eval_context_mut();
770 let mut found_callback = None;
771 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
773 match &thread.state {
774 ThreadState::Blocked { timeout: Some(timeout), .. }
775 if timeout.get_wait_time(&this.machine.clock) == Duration::ZERO =>
776 {
777 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
778 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
779 found_callback = Some((id, callback));
780 break;
782 }
783 _ => {}
784 }
785 }
786 if let Some((thread, callback)) = found_callback {
787 let old_thread = this.machine.threads.set_active_thread_id(thread);
794 callback.call(this, UnblockKind::TimedOut)?;
795 this.machine.threads.set_active_thread_id(old_thread);
796 }
797 interp_ok(())
804 }
805
806 #[inline]
807 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
808 let this = self.eval_context_mut();
809 let mut callback = this
810 .active_thread_mut()
811 .on_stack_empty
812 .take()
813 .expect("`on_stack_empty` not set up, or already running");
814 let res = callback(this)?;
815 this.active_thread_mut().on_stack_empty = Some(callback);
816 interp_ok(res)
817 }
818}
819
820impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
822pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
823 #[inline]
824 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
825 self.eval_context_ref().machine.threads.thread_id_try_from(id)
826 }
827
828 fn get_or_create_thread_local_alloc(
831 &mut self,
832 def_id: DefId,
833 ) -> InterpResult<'tcx, StrictPointer> {
834 let this = self.eval_context_mut();
835 let tcx = this.tcx;
836 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
837 interp_ok(old_alloc)
840 } else {
841 if tcx.is_foreign_item(def_id) {
845 throw_unsup_format!("foreign thread-local statics are not supported");
846 }
847 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
848 let mut alloc = alloc.inner().adjust_from_tcx(
850 &this.tcx,
851 |bytes, align| {
852 interp_ok(MiriAllocBytes::from_bytes(std::borrow::Cow::Borrowed(bytes), align))
853 },
854 |ptr| this.global_root_pointer(ptr),
855 )?;
856 alloc.mutability = Mutability::Mut;
858 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
860 this.machine.threads.set_thread_local_alloc(def_id, ptr);
861 interp_ok(ptr)
862 }
863 }
864
865 #[inline]
867 fn start_regular_thread(
868 &mut self,
869 thread: Option<MPlaceTy<'tcx>>,
870 start_routine: Pointer,
871 start_abi: ExternAbi,
872 func_arg: ImmTy<'tcx>,
873 ret_layout: TyAndLayout<'tcx>,
874 ) -> InterpResult<'tcx, ThreadId> {
875 let this = self.eval_context_mut();
876
877 let new_thread_id = this.machine.threads.create_thread({
879 let mut state = tls::TlsDtorsState::default();
880 Box::new(move |m| state.on_stack_empty(m))
881 });
882 let current_span = this.machine.current_span();
883 if let Some(data_race) = &mut this.machine.data_race {
884 data_race.thread_created(&this.machine.threads, new_thread_id, current_span);
885 }
886
887 if let Some(thread_info_place) = thread {
890 this.write_scalar(
891 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
892 &thread_info_place,
893 )?;
894 }
895
896 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
899
900 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
902 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
903 }
904
905 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
907
908 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
912
913 this.call_function(
914 instance,
915 start_abi,
916 &[func_arg],
917 Some(&ret_place),
918 StackPopCleanup::Root { cleanup: true },
919 )?;
920
921 this.machine.threads.set_active_thread_id(old_thread_id);
923
924 interp_ok(new_thread_id)
925 }
926
927 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
932 let this = self.eval_context_mut();
933 let thread = this.active_thread_mut();
935 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
936 thread.state = ThreadState::Terminated;
937 if let Some(ref mut data_race) = this.machine.data_race {
938 data_race.thread_terminated(&this.machine.threads);
939 }
940 let gone_thread = this.active_thread();
942 {
943 let mut free_tls_statics = Vec::new();
944 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
945 if thread != gone_thread {
946 return true;
948 }
949 free_tls_statics.push(alloc_id);
952 false
953 });
954 for ptr in free_tls_statics {
956 match tls_alloc_action {
957 TlsAllocAction::Deallocate =>
958 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
959 TlsAllocAction::Leak =>
960 if let Some(alloc) = ptr.provenance.get_alloc_id() {
961 trace!(
962 "Thread-local static leaked and stored as static root: {:?}",
963 alloc
964 );
965 this.machine.static_roots.push(alloc);
966 },
967 }
968 }
969 }
970 let unblock_reason = BlockReason::Join(gone_thread);
972 let threads = &this.machine.threads.threads;
973 let joining_threads = threads
974 .iter_enumerated()
975 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
976 .map(|(id, _)| id)
977 .collect::<Vec<_>>();
978 for thread in joining_threads {
979 this.unblock_thread(thread, unblock_reason)?;
980 }
981
982 interp_ok(())
983 }
984
985 #[inline]
988 fn block_thread(
989 &mut self,
990 reason: BlockReason,
991 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
992 callback: DynUnblockCallback<'tcx>,
993 ) {
994 let this = self.eval_context_mut();
995 let timeout = timeout.map(|(clock, anchor, duration)| {
996 let anchor = match clock {
997 TimeoutClock::RealTime => {
998 assert!(
999 this.machine.communicate(),
1000 "cannot have `RealTime` timeout with isolation enabled!"
1001 );
1002 Timeout::RealTime(match anchor {
1003 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1004 TimeoutAnchor::Relative => SystemTime::now(),
1005 })
1006 }
1007 TimeoutClock::Monotonic =>
1008 Timeout::Monotonic(match anchor {
1009 TimeoutAnchor::Absolute => this.machine.clock.epoch(),
1010 TimeoutAnchor::Relative => this.machine.clock.now(),
1011 }),
1012 };
1013 anchor.add_lossy(duration)
1014 });
1015 this.machine.threads.block_thread(reason, timeout, callback);
1016 }
1017
1018 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1021 let this = self.eval_context_mut();
1022 let old_state =
1023 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1024 let callback = match old_state {
1025 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1026 assert_eq!(
1027 reason, actual_reason,
1028 "unblock_thread: thread was blocked for the wrong reason"
1029 );
1030 callback
1031 }
1032 _ => panic!("unblock_thread: thread was not blocked"),
1033 };
1034 let old_thread = this.machine.threads.set_active_thread_id(thread);
1036 callback.call(this, UnblockKind::Ready)?;
1037 this.machine.threads.set_active_thread_id(old_thread);
1038 interp_ok(())
1039 }
1040
1041 #[inline]
1042 fn detach_thread(
1043 &mut self,
1044 thread_id: ThreadId,
1045 allow_terminated_joined: bool,
1046 ) -> InterpResult<'tcx> {
1047 let this = self.eval_context_mut();
1048 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1049 }
1050
1051 #[inline]
1052 fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
1053 let this = self.eval_context_mut();
1054 this.machine.threads.join_thread(joined_thread_id, this.machine.data_race.as_mut())?;
1055 interp_ok(())
1056 }
1057
1058 #[inline]
1059 fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
1060 let this = self.eval_context_mut();
1061 this.machine
1062 .threads
1063 .join_thread_exclusive(joined_thread_id, this.machine.data_race.as_mut())?;
1064 interp_ok(())
1065 }
1066
1067 #[inline]
1068 fn active_thread(&self) -> ThreadId {
1069 let this = self.eval_context_ref();
1070 this.machine.threads.active_thread()
1071 }
1072
1073 #[inline]
1074 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1075 let this = self.eval_context_mut();
1076 this.machine.threads.active_thread_mut()
1077 }
1078
1079 #[inline]
1080 fn active_thread_ref(&self) -> &Thread<'tcx> {
1081 let this = self.eval_context_ref();
1082 this.machine.threads.active_thread_ref()
1083 }
1084
1085 #[inline]
1086 fn get_total_thread_count(&self) -> usize {
1087 let this = self.eval_context_ref();
1088 this.machine.threads.get_total_thread_count()
1089 }
1090
1091 #[inline]
1092 fn have_all_terminated(&self) -> bool {
1093 let this = self.eval_context_ref();
1094 this.machine.threads.have_all_terminated()
1095 }
1096
1097 #[inline]
1098 fn enable_thread(&mut self, thread_id: ThreadId) {
1099 let this = self.eval_context_mut();
1100 this.machine.threads.enable_thread(thread_id);
1101 }
1102
1103 #[inline]
1104 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1105 let this = self.eval_context_ref();
1106 this.machine.threads.active_thread_stack()
1107 }
1108
1109 #[inline]
1110 fn active_thread_stack_mut<'a>(
1111 &'a mut self,
1112 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1113 let this = self.eval_context_mut();
1114 this.machine.threads.active_thread_stack_mut()
1115 }
1116
1117 #[inline]
1119 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1120 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1121 }
1122
1123 #[inline]
1124 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1125 where
1126 'tcx: 'c,
1127 {
1128 self.eval_context_ref().machine.threads.get_thread_name(thread)
1129 }
1130
1131 #[inline]
1132 fn yield_active_thread(&mut self) {
1133 self.eval_context_mut().machine.threads.yield_active_thread();
1134 }
1135
1136 #[inline]
1137 fn maybe_preempt_active_thread(&mut self) {
1138 use rand::Rng as _;
1139
1140 let this = self.eval_context_mut();
1141 if this.machine.rng.get_mut().random_bool(this.machine.preemption_rate) {
1142 this.yield_active_thread();
1143 }
1144 }
1145
1146 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1149 let this = self.eval_context_mut();
1150 loop {
1151 if CTRL_C_RECEIVED.load(Relaxed) {
1152 this.machine.handle_abnormal_termination();
1153 std::process::exit(1);
1154 }
1155 match this.machine.threads.schedule(&this.machine.clock)? {
1156 SchedulingAction::ExecuteStep => {
1157 if !this.step()? {
1158 match this.run_on_stack_empty()? {
1160 Poll::Pending => {} Poll::Ready(()) =>
1162 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1163 }
1164 }
1165 }
1166 SchedulingAction::ExecuteTimeoutCallback => {
1167 this.run_timeout_callback()?;
1168 }
1169 SchedulingAction::Sleep(duration) => {
1170 this.machine.clock.sleep(duration);
1171 }
1172 }
1173 }
1174 }
1175}