miri/concurrency/blocking_io.rs
1use std::io;
2use std::time::Duration;
3
4use mio::event::Source;
5use mio::{Events, Interest, Poll, Token};
6use rustc_data_structures::fx::FxHashMap;
7
8use crate::*;
9
10/// Capacity of the event queue which can be polled at a time.
11/// Since we don't expect many simultaneous blocking I/O events
12/// this value can be set rather low.
13const IO_EVENT_CAPACITY: usize = 16;
14
15/// Trait for values that contain a mio [`Source`].
16pub trait WithSource {
17 /// Invoke `f` on the source inside `self`.
18 fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
19}
20
21/// Manager for managing blocking host I/O in a non-blocking manner.
22/// We use [`Poll`] to poll for new I/O events from the OS for sources
23/// registered using this manager.
24///
25/// Since blocking host I/O is inherently non-deterministic, no method on this
26/// manager should be called when isolation is enabled. The only exception is
27/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
28/// we assert that isolation is disabled!
29pub struct BlockingIoManager {
30 /// Poll instance to monitor I/O events from the OS.
31 /// This is only [`None`] when Miri is run with isolation enabled.
32 poll: Option<Poll>,
33 /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
34 /// This is not part of the state and only stored to avoid allocating a
35 /// new buffer for every poll.
36 events: Events,
37 /// Map between threads which are currently blocked and the
38 /// underlying I/O source.
39 sources: FxHashMap<ThreadId, Box<dyn WithSource>>,
40}
41
42impl BlockingIoManager {
43 /// Create a new blocking I/O manager instance based on the availability
44 /// of communication with the host.
45 pub fn new(communicate: bool) -> Result<Self, io::Error> {
46 let manager = Self {
47 poll: communicate.then_some(Poll::new()?),
48 events: Events::with_capacity(IO_EVENT_CAPACITY),
49 sources: FxHashMap::default(),
50 };
51 Ok(manager)
52 }
53
54 /// Poll for new I/O events from the OS or wait until the timeout expired.
55 ///
56 /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
57 /// reads all events since the last poll.
58 /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
59 /// specified duration.
60 /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
61 ///
62 /// Returns all threads that are ready because they received an I/O event.
63 pub fn poll(&mut self, timeout: Option<Duration>) -> Result<Vec<ThreadId>, io::Error> {
64 let poll =
65 self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
66
67 // Poll for new I/O events from OS and store them in the events buffer.
68 poll.poll(&mut self.events, timeout)?;
69
70 let ready = self
71 .events
72 .iter()
73 .map(|event| {
74 let token = event.token();
75 ThreadId::new_unchecked(token.0.try_into().unwrap())
76 })
77 .collect::<Vec<_>>();
78
79 // Deregister all ready sources as we only want to receive one event per thread.
80 ready.iter().for_each(|thread_id| self.deregister(*thread_id));
81
82 Ok(ready)
83 }
84
85 /// Register a blocking I/O source for a thread together with it's poll interests.
86 ///
87 /// The source will be deregistered automatically once an event for it is received.
88 ///
89 /// As the OS can always produce spurious wake-ups, it's the callers responsibility to
90 /// verify the requested I/O interests are really ready and to register again if they're not.
91 pub fn register(&mut self, source: Box<dyn WithSource>, thread: ThreadId, interests: Interest) {
92 let poll =
93 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
94
95 let token = Token(thread.to_u32().to_usize());
96
97 // Treat errors from registering as fatal. On UNIX hosts this can only
98 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
99 source
100 .with_source(&mut |source| source.register(poll.registry(), token, interests))
101 .unwrap();
102 self.sources
103 .try_insert(thread, source)
104 .unwrap_or_else(|_| panic!("A thread cannot be registered twice at the same time"));
105 }
106
107 /// Deregister the event source for a thread. Returns the kind of I/O the thread was
108 /// blocked on.
109 fn deregister(&mut self, thread: ThreadId) {
110 let poll =
111 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
112
113 let Some(source) = self.sources.remove(&thread) else {
114 panic!("Attempt to deregister a token which isn't registered")
115 };
116
117 // Treat errors from deregistering as fatal. On UNIX hosts this can only
118 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
119 source.with_source(&mut |source| source.deregister(poll.registry())).unwrap();
120 }
121}
122
123impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
124pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
125 /// Block the current thread until some interests on an I/O source
126 /// are fulfilled or the optional timeout exceeded.
127 /// The callback will be invoked when the thread gets unblocked.
128 ///
129 /// There can be spurious wake-ups by the OS and thus it's the callers
130 /// responsibility to verify that the requested I/O interests are
131 /// really ready and to block again if they're not.
132 #[inline]
133 fn block_thread_for_io(
134 &mut self,
135 source: impl WithSource + 'static,
136 interests: Interest,
137 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
138 callback: DynUnblockCallback<'tcx>,
139 ) {
140 let this = self.eval_context_mut();
141 this.machine.blocking_io.register(
142 Box::new(source),
143 this.machine.threads.active_thread(),
144 interests,
145 );
146 this.block_thread(BlockReason::IO, timeout, callback);
147 }
148}