1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use either::Either;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::fx::FxHashMap;
13use rustc_hir::def_id::DefId;
14use rustc_index::{Idx, IndexVec};
15use rustc_middle::mir::Mutability;
16use rustc_middle::ty::layout::TyAndLayout;
17use rustc_span::Span;
18
19use crate::concurrency::GlobalDataRaceHandler;
20use crate::shims::tls;
21use crate::*;
22
23#[derive(Clone, Copy, Debug, PartialEq)]
24enum SchedulingAction {
25 ExecuteStep,
27 ExecuteTimeoutCallback,
29 Sleep(Duration),
31}
32
33#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum TlsAllocAction {
36 Deallocate,
38 Leak,
41}
42
43#[derive(Clone, Copy, Debug, PartialEq)]
45pub enum UnblockKind {
46 Ready,
48 TimedOut,
50}
51
52pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
55
56#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
58pub struct ThreadId(u32);
59
60impl ThreadId {
61 pub fn to_u32(self) -> u32 {
62 self.0
63 }
64
65 pub fn new_unchecked(id: u32) -> Self {
67 Self(id)
68 }
69
70 pub const MAIN_THREAD: ThreadId = ThreadId(0);
71}
72
73impl Idx for ThreadId {
74 fn new(idx: usize) -> Self {
75 ThreadId(u32::try_from(idx).unwrap())
76 }
77
78 fn index(self) -> usize {
79 usize::try_from(self.0).unwrap()
80 }
81}
82
83impl From<ThreadId> for u64 {
84 fn from(t: ThreadId) -> Self {
85 t.0.into()
86 }
87}
88
89#[derive(Debug, Copy, Clone, PartialEq, Eq)]
91pub enum BlockReason {
92 Join(ThreadId),
95 Sleep,
97 Mutex,
99 Condvar,
101 RwLock,
103 Futex,
105 InitOnce,
107 Epoll,
109 Eventfd,
111 UnnamedSocket,
113}
114
115enum ThreadState<'tcx> {
117 Enabled,
119 Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
121 Terminated,
124}
125
126impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 Self::Enabled => write!(f, "Enabled"),
130 Self::Blocked { reason, timeout, .. } =>
131 f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
132 Self::Terminated => write!(f, "Terminated"),
133 }
134 }
135}
136
137impl<'tcx> ThreadState<'tcx> {
138 fn is_enabled(&self) -> bool {
139 matches!(self, ThreadState::Enabled)
140 }
141
142 fn is_terminated(&self) -> bool {
143 matches!(self, ThreadState::Terminated)
144 }
145
146 fn is_blocked_on(&self, reason: BlockReason) -> bool {
147 matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
148 }
149}
150
151#[derive(Debug, Copy, Clone, PartialEq, Eq)]
153enum ThreadJoinStatus {
154 Joinable,
156 Detached,
159 Joined,
161}
162
163pub struct Thread<'tcx> {
165 state: ThreadState<'tcx>,
166
167 thread_name: Option<Vec<u8>>,
169
170 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
172
173 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
178
179 top_user_relevant_frame: Option<usize>,
185
186 join_status: ThreadJoinStatus,
188
189 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
198
199 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
201}
202
203pub type StackEmptyCallback<'tcx> =
204 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
205
206impl<'tcx> Thread<'tcx> {
207 fn thread_name(&self) -> Option<&[u8]> {
209 self.thread_name.as_deref()
210 }
211
212 pub fn is_enabled(&self) -> bool {
214 self.state.is_enabled()
215 }
216
217 fn thread_display_name(&self, id: ThreadId) -> String {
219 if let Some(ref thread_name) = self.thread_name {
220 String::from_utf8_lossy(thread_name).into_owned()
221 } else {
222 format!("unnamed-{}", id.index())
223 }
224 }
225
226 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
232 self.stack
233 .iter()
234 .enumerate()
235 .rev()
236 .skip(skip)
237 .find_map(|(idx, frame)| if frame.extra.is_user_relevant { Some(idx) } else { None })
238 }
239
240 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
243 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
244 }
245
246 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
249 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
250 self.top_user_relevant_frame = Some(frame_idx);
251 }
252
253 pub fn top_user_relevant_frame(&self) -> Option<usize> {
256 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
257 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
261 }
262
263 pub fn current_span(&self) -> Span {
264 self.top_user_relevant_frame()
265 .map(|frame_idx| self.stack[frame_idx].current_span())
266 .unwrap_or(rustc_span::DUMMY_SP)
267 }
268}
269
270impl<'tcx> std::fmt::Debug for Thread<'tcx> {
271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 write!(
273 f,
274 "{}({:?}, {:?})",
275 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
276 self.state,
277 self.join_status
278 )
279 }
280}
281
282impl<'tcx> Thread<'tcx> {
283 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
284 Self {
285 state: ThreadState::Enabled,
286 thread_name: name.map(|name| Vec::from(name.as_bytes())),
287 stack: Vec::new(),
288 top_user_relevant_frame: None,
289 join_status: ThreadJoinStatus::Joinable,
290 unwind_payloads: Vec::new(),
291 last_error: None,
292 on_stack_empty,
293 }
294 }
295}
296
297impl VisitProvenance for Thread<'_> {
298 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
299 let Thread {
300 unwind_payloads: panic_payload,
301 last_error,
302 stack,
303 top_user_relevant_frame: _,
304 state: _,
305 thread_name: _,
306 join_status: _,
307 on_stack_empty: _, } = self;
309
310 for payload in panic_payload {
311 payload.visit_provenance(visit);
312 }
313 last_error.visit_provenance(visit);
314 for frame in stack {
315 frame.visit_provenance(visit)
316 }
317 }
318}
319
320impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
321 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
322 let Frame {
323 return_place,
324 locals,
325 extra,
326 ..
328 } = self;
329
330 return_place.visit_provenance(visit);
332 for local in locals.iter() {
334 match local.as_mplace_or_imm() {
335 None => {}
336 Some(Either::Left((ptr, meta))) => {
337 ptr.visit_provenance(visit);
338 meta.visit_provenance(visit);
339 }
340 Some(Either::Right(imm)) => {
341 imm.visit_provenance(visit);
342 }
343 }
344 }
345
346 extra.visit_provenance(visit);
347 }
348}
349
350#[derive(Debug)]
352enum Timeout {
353 Monotonic(Instant),
354 RealTime(SystemTime),
355}
356
357impl Timeout {
358 fn get_wait_time(&self, clock: &MonotonicClock) -> Duration {
360 match self {
361 Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
362 Timeout::RealTime(time) =>
363 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
364 }
365 }
366
367 fn add_lossy(&self, duration: Duration) -> Self {
369 match self {
370 Timeout::Monotonic(i) => Timeout::Monotonic(i.add_lossy(duration)),
371 Timeout::RealTime(s) => {
372 Timeout::RealTime(
374 s.checked_add(duration)
375 .unwrap_or_else(|| s.checked_add(Duration::from_secs(3600)).unwrap()),
376 )
377 }
378 }
379 }
380}
381
382#[derive(Debug, Copy, Clone, PartialEq)]
384pub enum TimeoutClock {
385 Monotonic,
386 RealTime,
387}
388
389#[derive(Debug, Copy, Clone)]
391pub enum TimeoutAnchor {
392 Relative,
393 Absolute,
394}
395
396#[derive(Debug, Copy, Clone)]
398pub struct ThreadNotFound;
399
400#[derive(Debug)]
402pub struct ThreadManager<'tcx> {
403 active_thread: ThreadId,
405 threads: IndexVec<ThreadId, Thread<'tcx>>,
409 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
411 yield_active_thread: bool,
414 fixed_scheduling: bool,
416}
417
418impl VisitProvenance for ThreadManager<'_> {
419 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
420 let ThreadManager {
421 threads,
422 thread_local_allocs,
423 active_thread: _,
424 yield_active_thread: _,
425 fixed_scheduling: _,
426 } = self;
427
428 for thread in threads {
429 thread.visit_provenance(visit);
430 }
431 for ptr in thread_local_allocs.values() {
432 ptr.visit_provenance(visit);
433 }
434 }
435}
436
437impl<'tcx> ThreadManager<'tcx> {
438 pub(crate) fn new(config: &MiriConfig) -> Self {
439 let mut threads = IndexVec::new();
440 threads.push(Thread::new(Some("main"), None));
442 Self {
443 active_thread: ThreadId::MAIN_THREAD,
444 threads,
445 thread_local_allocs: Default::default(),
446 yield_active_thread: false,
447 fixed_scheduling: config.fixed_scheduling,
448 }
449 }
450
451 pub(crate) fn init(
452 ecx: &mut MiriInterpCx<'tcx>,
453 on_main_stack_empty: StackEmptyCallback<'tcx>,
454 ) {
455 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
456 Some(on_main_stack_empty);
457 if ecx.tcx.sess.target.os.as_ref() != "windows" {
458 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
460 ThreadJoinStatus::Detached;
461 }
462 }
463
464 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
465 if let Ok(id) = id.try_into()
466 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
467 {
468 Ok(ThreadId(id))
469 } else {
470 Err(ThreadNotFound)
471 }
472 }
473
474 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
477 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
478 }
479
480 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
485 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
486 }
487
488 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
490 &self.threads[self.active_thread].stack
491 }
492
493 pub fn active_thread_stack_mut(
495 &mut self,
496 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
497 &mut self.threads[self.active_thread].stack
498 }
499
500 pub fn all_stacks(
501 &self,
502 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
503 self.threads.iter_enumerated().map(|(id, t)| (id, &t.stack[..]))
504 }
505
506 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
508 let new_thread_id = ThreadId::new(self.threads.len());
509 self.threads.push(Thread::new(None, Some(on_stack_empty)));
510 new_thread_id
511 }
512
513 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
515 assert!(id.index() < self.threads.len());
516 info!(
517 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
518 self.get_thread_display_name(id),
519 self.get_thread_display_name(self.active_thread)
520 );
521 std::mem::replace(&mut self.active_thread, id)
522 }
523
524 pub fn active_thread(&self) -> ThreadId {
526 self.active_thread
527 }
528
529 pub fn get_total_thread_count(&self) -> usize {
531 self.threads.len()
532 }
533
534 pub fn get_live_thread_count(&self) -> usize {
537 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
538 }
539
540 fn has_terminated(&self, thread_id: ThreadId) -> bool {
542 self.threads[thread_id].state.is_terminated()
543 }
544
545 fn have_all_terminated(&self) -> bool {
547 self.threads.iter().all(|thread| thread.state.is_terminated())
548 }
549
550 fn enable_thread(&mut self, thread_id: ThreadId) {
552 assert!(self.has_terminated(thread_id));
553 self.threads[thread_id].state = ThreadState::Enabled;
554 }
555
556 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
558 &mut self.threads[self.active_thread]
559 }
560
561 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
563 &self.threads[self.active_thread]
564 }
565
566 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
575 trace!("detaching {:?}", id);
576
577 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
578 self.threads[id].join_status == ThreadJoinStatus::Detached
580 } else {
581 self.threads[id].join_status != ThreadJoinStatus::Joinable
582 };
583 if is_ub {
584 throw_ub_format!("trying to detach thread that was already detached or joined");
585 }
586
587 self.threads[id].join_status = ThreadJoinStatus::Detached;
588 interp_ok(())
589 }
590
591 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
593 self.threads[thread].thread_name = Some(new_thread_name);
594 }
595
596 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
598 self.threads[thread].thread_name()
599 }
600
601 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
602 self.threads[thread].thread_display_name(thread)
603 }
604
605 fn block_thread(
607 &mut self,
608 reason: BlockReason,
609 timeout: Option<Timeout>,
610 callback: DynUnblockCallback<'tcx>,
611 ) {
612 let state = &mut self.threads[self.active_thread].state;
613 assert!(state.is_enabled());
614 *state = ThreadState::Blocked { reason, timeout, callback }
615 }
616
617 fn yield_active_thread(&mut self) {
619 self.yield_active_thread = true;
623 }
624
625 fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
627 self.threads
628 .iter()
629 .filter_map(|t| {
630 match &t.state {
631 ThreadState::Blocked { timeout: Some(timeout), .. } =>
632 Some(timeout.get_wait_time(clock)),
633 _ => None,
634 }
635 })
636 .min()
637 }
638}
639
640impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
641trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
642 #[inline]
644 fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
645 let this = self.eval_context_mut();
646 let mut found_callback = None;
647 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
649 match &thread.state {
650 ThreadState::Blocked { timeout: Some(timeout), .. }
651 if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
652 {
653 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
654 let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
655 found_callback = Some((id, callback));
656 break;
658 }
659 _ => {}
660 }
661 }
662 if let Some((thread, callback)) = found_callback {
663 let old_thread = this.machine.threads.set_active_thread_id(thread);
670 callback.call(this, UnblockKind::TimedOut)?;
671 this.machine.threads.set_active_thread_id(old_thread);
672 }
673 interp_ok(())
680 }
681
682 #[inline]
683 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
684 let this = self.eval_context_mut();
685 let mut callback = this
686 .active_thread_mut()
687 .on_stack_empty
688 .take()
689 .expect("`on_stack_empty` not set up, or already running");
690 let res = callback(this)?;
691 this.active_thread_mut().on_stack_empty = Some(callback);
692 interp_ok(res)
693 }
694
695 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
704 let this = self.eval_context_mut();
705
706 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
708 let next_thread_id = genmc_ctx.schedule_thread(this)?;
709
710 let thread_manager = &mut this.machine.threads;
711 thread_manager.active_thread = next_thread_id;
712
713 assert!(thread_manager.threads[thread_manager.active_thread].state.is_enabled());
714 return interp_ok(SchedulingAction::ExecuteStep);
715 }
716
717 let thread_manager = &mut this.machine.threads;
719 let clock = &this.machine.monotonic_clock;
720 let rng = this.machine.rng.get_mut();
721 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
723 && !thread_manager.yield_active_thread
724 {
725 return interp_ok(SchedulingAction::ExecuteStep);
727 }
728 let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
735 if potential_sleep_time == Some(Duration::ZERO) {
736 return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
737 }
738 let mut threads_iter = thread_manager
745 .threads
746 .iter_enumerated()
747 .skip(thread_manager.active_thread.index() + 1)
748 .chain(
749 thread_manager
750 .threads
751 .iter_enumerated()
752 .take(thread_manager.active_thread.index() + 1),
753 )
754 .filter(|(_id, thread)| thread.state.is_enabled());
755 let new_thread = if thread_manager.fixed_scheduling {
757 threads_iter.next()
758 } else {
759 threads_iter.choose(rng)
760 };
761
762 if let Some((id, _thread)) = new_thread {
763 if thread_manager.active_thread != id {
764 info!(
765 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
766 thread_manager.get_thread_display_name(id),
767 thread_manager.get_thread_display_name(thread_manager.active_thread)
768 );
769 thread_manager.active_thread = id;
770 }
771 }
772 thread_manager.yield_active_thread = false;
774
775 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
776 return interp_ok(SchedulingAction::ExecuteStep);
777 }
778 if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
780 unreachable!("all threads terminated without the main thread terminating?!");
781 } else if let Some(sleep_time) = potential_sleep_time {
782 interp_ok(SchedulingAction::Sleep(sleep_time))
786 } else {
787 throw_machine_stop!(TerminationInfo::Deadlock);
788 }
789 }
790}
791
792impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
794pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
795 #[inline]
796 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadNotFound> {
797 self.eval_context_ref().machine.threads.thread_id_try_from(id)
798 }
799
800 fn get_or_create_thread_local_alloc(
803 &mut self,
804 def_id: DefId,
805 ) -> InterpResult<'tcx, StrictPointer> {
806 let this = self.eval_context_mut();
807 let tcx = this.tcx;
808 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
809 interp_ok(old_alloc)
812 } else {
813 if tcx.is_foreign_item(def_id) {
817 throw_unsup_format!("foreign thread-local statics are not supported");
818 }
819 let params = this.machine.get_default_alloc_params();
820 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
821 let mut alloc = alloc.inner().adjust_from_tcx(
823 &this.tcx,
824 |bytes, align| {
825 interp_ok(MiriAllocBytes::from_bytes(
826 std::borrow::Cow::Borrowed(bytes),
827 align,
828 params,
829 ))
830 },
831 |ptr| this.global_root_pointer(ptr),
832 )?;
833 alloc.mutability = Mutability::Mut;
835 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
837 this.machine.threads.set_thread_local_alloc(def_id, ptr);
838 interp_ok(ptr)
839 }
840 }
841
842 #[inline]
844 fn start_regular_thread(
845 &mut self,
846 thread: Option<MPlaceTy<'tcx>>,
847 start_routine: Pointer,
848 start_abi: ExternAbi,
849 func_arg: ImmTy<'tcx>,
850 ret_layout: TyAndLayout<'tcx>,
851 ) -> InterpResult<'tcx, ThreadId> {
852 let this = self.eval_context_mut();
853
854 let new_thread_id = this.machine.threads.create_thread({
856 let mut state = tls::TlsDtorsState::default();
857 Box::new(move |m| state.on_stack_empty(m))
858 });
859 let current_span = this.machine.current_span();
860 match &mut this.machine.data_race {
861 GlobalDataRaceHandler::None => {}
862 GlobalDataRaceHandler::Vclocks(data_race) =>
863 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
864 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
865 genmc_ctx.handle_thread_create(
866 &this.machine.threads,
867 start_routine,
868 &func_arg,
869 new_thread_id,
870 )?,
871 }
872 if let Some(thread_info_place) = thread {
875 this.write_scalar(
876 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
877 &thread_info_place,
878 )?;
879 }
880
881 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
884
885 if let Some(cpuset) = this.machine.thread_cpu_affinity.get(&old_thread_id).cloned() {
887 this.machine.thread_cpu_affinity.insert(new_thread_id, cpuset);
888 }
889
890 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
892
893 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
897
898 this.call_function(
899 instance,
900 start_abi,
901 &[func_arg],
902 Some(&ret_place),
903 ReturnContinuation::Stop { cleanup: true },
904 )?;
905
906 this.machine.threads.set_active_thread_id(old_thread_id);
908
909 interp_ok(new_thread_id)
910 }
911
912 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
917 let this = self.eval_context_mut();
918
919 let thread = this.active_thread_mut();
921 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
922 thread.state = ThreadState::Terminated;
923
924 let gone_thread = this.active_thread();
926 {
927 let mut free_tls_statics = Vec::new();
928 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
929 if thread != gone_thread {
930 return true;
932 }
933 free_tls_statics.push(alloc_id);
936 false
937 });
938 for ptr in free_tls_statics {
940 match tls_alloc_action {
941 TlsAllocAction::Deallocate =>
942 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
943 TlsAllocAction::Leak =>
944 if let Some(alloc) = ptr.provenance.get_alloc_id() {
945 trace!(
946 "Thread-local static leaked and stored as static root: {:?}",
947 alloc
948 );
949 this.machine.static_roots.push(alloc);
950 },
951 }
952 }
953 }
954
955 match &mut this.machine.data_race {
956 GlobalDataRaceHandler::None => {}
957 GlobalDataRaceHandler::Vclocks(data_race) =>
958 data_race.thread_terminated(&this.machine.threads),
959 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
960 genmc_ctx.handle_thread_finish(&this.machine.threads)
963 }
964 }
965
966 let unblock_reason = BlockReason::Join(gone_thread);
968 let threads = &this.machine.threads.threads;
969 let joining_threads = threads
970 .iter_enumerated()
971 .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
972 .map(|(id, _)| id)
973 .collect::<Vec<_>>();
974 for thread in joining_threads {
975 this.unblock_thread(thread, unblock_reason)?;
976 }
977
978 interp_ok(())
979 }
980
981 #[inline]
984 fn block_thread(
985 &mut self,
986 reason: BlockReason,
987 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
988 callback: DynUnblockCallback<'tcx>,
989 ) {
990 let this = self.eval_context_mut();
991 if timeout.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
992 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
993 }
994 let timeout = timeout.map(|(clock, anchor, duration)| {
995 let anchor = match clock {
996 TimeoutClock::RealTime => {
997 assert!(
998 this.machine.communicate(),
999 "cannot have `RealTime` timeout with isolation enabled!"
1000 );
1001 Timeout::RealTime(match anchor {
1002 TimeoutAnchor::Absolute => SystemTime::UNIX_EPOCH,
1003 TimeoutAnchor::Relative => SystemTime::now(),
1004 })
1005 }
1006 TimeoutClock::Monotonic =>
1007 Timeout::Monotonic(match anchor {
1008 TimeoutAnchor::Absolute => this.machine.monotonic_clock.epoch(),
1009 TimeoutAnchor::Relative => this.machine.monotonic_clock.now(),
1010 }),
1011 };
1012 anchor.add_lossy(duration)
1013 });
1014 this.machine.threads.block_thread(reason, timeout, callback);
1015 }
1016
1017 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1020 let this = self.eval_context_mut();
1021 let old_state =
1022 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1023 let callback = match old_state {
1024 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1025 assert_eq!(
1026 reason, actual_reason,
1027 "unblock_thread: thread was blocked for the wrong reason"
1028 );
1029 callback
1030 }
1031 _ => panic!("unblock_thread: thread was not blocked"),
1032 };
1033 let old_thread = this.machine.threads.set_active_thread_id(thread);
1035 callback.call(this, UnblockKind::Ready)?;
1036 this.machine.threads.set_active_thread_id(old_thread);
1037 interp_ok(())
1038 }
1039
1040 #[inline]
1041 fn detach_thread(
1042 &mut self,
1043 thread_id: ThreadId,
1044 allow_terminated_joined: bool,
1045 ) -> InterpResult<'tcx> {
1046 let this = self.eval_context_mut();
1047 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1048 }
1049
1050 fn join_thread(
1054 &mut self,
1055 joined_thread_id: ThreadId,
1056 success_retval: Scalar,
1057 return_dest: &MPlaceTy<'tcx>,
1058 ) -> InterpResult<'tcx> {
1059 let this = self.eval_context_mut();
1060 let thread_mgr = &mut this.machine.threads;
1061 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1062 throw_ub_format!("trying to join a detached thread");
1064 }
1065
1066 fn after_join<'tcx>(
1067 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1068 joined_thread_id: ThreadId,
1069 success_retval: Scalar,
1070 return_dest: &MPlaceTy<'tcx>,
1071 ) -> InterpResult<'tcx> {
1072 let threads = &this.machine.threads;
1073 match &mut this.machine.data_race {
1074 GlobalDataRaceHandler::None => {}
1075 GlobalDataRaceHandler::Vclocks(data_race) =>
1076 data_race.thread_joined(threads, joined_thread_id),
1077 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1078 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1079 }
1080 this.write_scalar(success_retval, return_dest)?;
1081 interp_ok(())
1082 }
1083
1084 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1087 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1088 trace!(
1089 "{:?} blocked on {:?} when trying to join",
1090 thread_mgr.active_thread, joined_thread_id
1091 );
1092 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1093 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1094 }
1095
1096 let dest = return_dest.clone();
1099 thread_mgr.block_thread(
1100 BlockReason::Join(joined_thread_id),
1101 None,
1102 callback!(
1103 @capture<'tcx> {
1104 joined_thread_id: ThreadId,
1105 dest: MPlaceTy<'tcx>,
1106 success_retval: Scalar,
1107 }
1108 |this, unblock: UnblockKind| {
1109 assert_eq!(unblock, UnblockKind::Ready);
1110 after_join(this, joined_thread_id, success_retval, &dest)
1111 }
1112 ),
1113 );
1114 } else {
1115 after_join(this, joined_thread_id, success_retval, return_dest)?;
1117 }
1118 interp_ok(())
1119 }
1120
1121 fn join_thread_exclusive(
1126 &mut self,
1127 joined_thread_id: ThreadId,
1128 success_retval: Scalar,
1129 return_dest: &MPlaceTy<'tcx>,
1130 ) -> InterpResult<'tcx> {
1131 let this = self.eval_context_mut();
1132 let threads = &this.machine.threads.threads;
1133 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1134 throw_ub_format!("trying to join an already joined thread");
1135 }
1136
1137 if joined_thread_id == this.machine.threads.active_thread {
1138 throw_ub_format!("trying to join itself");
1139 }
1140
1141 assert!(
1143 threads
1144 .iter()
1145 .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1146 "this thread already has threads waiting for its termination"
1147 );
1148
1149 this.join_thread(joined_thread_id, success_retval, return_dest)
1150 }
1151
1152 #[inline]
1153 fn active_thread(&self) -> ThreadId {
1154 let this = self.eval_context_ref();
1155 this.machine.threads.active_thread()
1156 }
1157
1158 #[inline]
1159 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1160 let this = self.eval_context_mut();
1161 this.machine.threads.active_thread_mut()
1162 }
1163
1164 #[inline]
1165 fn active_thread_ref(&self) -> &Thread<'tcx> {
1166 let this = self.eval_context_ref();
1167 this.machine.threads.active_thread_ref()
1168 }
1169
1170 #[inline]
1171 fn get_total_thread_count(&self) -> usize {
1172 let this = self.eval_context_ref();
1173 this.machine.threads.get_total_thread_count()
1174 }
1175
1176 #[inline]
1177 fn have_all_terminated(&self) -> bool {
1178 let this = self.eval_context_ref();
1179 this.machine.threads.have_all_terminated()
1180 }
1181
1182 #[inline]
1183 fn enable_thread(&mut self, thread_id: ThreadId) {
1184 let this = self.eval_context_mut();
1185 this.machine.threads.enable_thread(thread_id);
1186 }
1187
1188 #[inline]
1189 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1190 let this = self.eval_context_ref();
1191 this.machine.threads.active_thread_stack()
1192 }
1193
1194 #[inline]
1195 fn active_thread_stack_mut<'a>(
1196 &'a mut self,
1197 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1198 let this = self.eval_context_mut();
1199 this.machine.threads.active_thread_stack_mut()
1200 }
1201
1202 #[inline]
1204 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1205 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1206 }
1207
1208 #[inline]
1209 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1210 where
1211 'tcx: 'c,
1212 {
1213 self.eval_context_ref().machine.threads.get_thread_name(thread)
1214 }
1215
1216 #[inline]
1217 fn yield_active_thread(&mut self) {
1218 self.eval_context_mut().machine.threads.yield_active_thread();
1219 }
1220
1221 #[inline]
1222 fn maybe_preempt_active_thread(&mut self) {
1223 use rand::Rng as _;
1224
1225 let this = self.eval_context_mut();
1226 if !this.machine.threads.fixed_scheduling
1227 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1228 {
1229 this.yield_active_thread();
1230 }
1231 }
1232
1233 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1236 let this = self.eval_context_mut();
1237 loop {
1238 if CTRL_C_RECEIVED.load(Relaxed) {
1239 this.machine.handle_abnormal_termination();
1240 throw_machine_stop!(TerminationInfo::Interrupted);
1241 }
1242 match this.schedule()? {
1243 SchedulingAction::ExecuteStep => {
1244 if !this.step()? {
1245 match this.run_on_stack_empty()? {
1247 Poll::Pending => {} Poll::Ready(()) =>
1249 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1250 }
1251 }
1252 }
1253 SchedulingAction::ExecuteTimeoutCallback => {
1254 this.run_timeout_callback()?;
1255 }
1256 SchedulingAction::Sleep(duration) => {
1257 this.machine.monotonic_clock.sleep(duration);
1258 }
1259 }
1260 }
1261 }
1262}