miri/concurrency/blocking_io.rs
1use std::cell::RefMut;
2use std::collections::BTreeMap;
3use std::io;
4use std::ops::BitOrAssign;
5use std::time::Duration;
6
7use mio::event::Source;
8use mio::{Events, Interest, Poll, Token};
9
10use crate::shims::{
11 EpollEvalContextExt, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
12};
13use crate::*;
14
15/// Capacity of the event queue which can be polled at a time.
16/// Since we don't expect many simultaneous blocking I/O events
17/// this value can be set rather low.
18const IO_EVENT_CAPACITY: usize = 16;
19
20/// Trait for file descriptions that contain a mio [`Source`].
21pub trait SourceFileDescription: FileDescription {
22 /// Invoke `f` on the source inside `self`.
23 fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
24
25 /// Get a mutable reference to the readiness of the source.
26 fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness>;
27}
28
29/// An I/O interest for a blocked thread. Note that all threads are always considered
30/// to be interested in "error" events.
31#[derive(Debug, Clone, Copy)]
32pub enum BlockingIoInterest {
33 /// The blocked thread is interested in [`Interest::READABLE`].
34 Read,
35 /// The blocked thread is interested in [`Interest::WRITABLE`].
36 Write,
37 /// The blocked thread is interested in [`Interest::READABLE`] and
38 /// [`Interest::WRITABLE`].
39 ReadWrite,
40}
41
42/// Struct reflecting the readiness of a source file description.
43#[derive(Debug)]
44pub struct BlockingIoSourceReadiness {
45 /// Boolean whether the source is currently readable.
46 pub readable: bool,
47 /// Boolean whether the source is currently writable.
48 pub writable: bool,
49 /// Boolean whether the read end of the source has been
50 /// closed.
51 pub read_closed: bool,
52 /// Boolean whether the write end of the source has been
53 /// closed.
54 pub write_closed: bool,
55 /// Boolean whether the source currently has an error.
56 pub error: bool,
57}
58
59impl BlockingIoSourceReadiness {
60 pub fn empty() -> Self {
61 Self {
62 readable: false,
63 writable: false,
64 read_closed: false,
65 write_closed: false,
66 error: false,
67 }
68 }
69
70 /// Check whether the current readiness fulfills the blocking I/O interest of
71 /// `interest`.
72 /// This function also returns `true` if the error readiness is set
73 /// even when the requested interest might not be fulfilled.
74 fn fulfills_interest(&self, interest: &BlockingIoInterest) -> bool {
75 match interest {
76 BlockingIoInterest::Read => self.readable || self.error,
77 BlockingIoInterest::Write => self.writable || self.error,
78 BlockingIoInterest::ReadWrite => self.readable || self.writable || self.error,
79 }
80 }
81}
82
83impl BitOrAssign for BlockingIoSourceReadiness {
84 fn bitor_assign(&mut self, rhs: Self) {
85 self.readable |= rhs.readable;
86 self.writable |= rhs.writable;
87 self.read_closed |= rhs.read_closed;
88 self.write_closed |= rhs.write_closed;
89 self.error |= rhs.error;
90 }
91}
92
93impl From<&mio::event::Event> for BlockingIoSourceReadiness {
94 fn from(event: &mio::event::Event) -> Self {
95 Self {
96 readable: event.is_readable(),
97 writable: event.is_writable(),
98 read_closed: event.is_read_closed(),
99 write_closed: event.is_write_closed(),
100 error: event.is_error(),
101 }
102 }
103}
104
105struct BlockingIoSource {
106 /// The source file description which is registered into the poll.
107 /// We only store weak references such that source file descriptions
108 /// can be destroyed whilst they are registered. However, they are required
109 /// to deregister themselves when [`FileDescription::destroy`] is called.
110 fd: WeakFileDescriptionRef<dyn SourceFileDescription>,
111 /// The threads which are blocked on the I/O source, and the interest indicating
112 /// when they should be unblocked.
113 blocked_threads: BTreeMap<ThreadId, BlockingIoInterest>,
114}
115
116/// Manager for managing blocking host I/O in a non-blocking manner.
117/// We use [`Poll`] to poll for new I/O events from the OS for sources
118/// registered using this manager.
119///
120/// The semantics of this manager are that host I/O sources are registered
121/// to a [`Poll`] for their entire lifespan. Once host readiness events happen
122/// on a registered source, its internal epoll readiness gets updated -- even
123/// when the source isn't part of an active epoll instance. Also, for the entire
124/// lifespan of the source, threads can be added which should be unblocked
125/// once a certain [`BlockingIoSourceReadiness`] for an I/O source is satisfied.
126///
127/// Since blocking host I/O is inherently non-deterministic, no method on this
128/// manager should be called when isolation is enabled. The only exception is
129/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
130/// we assert that isolation is disabled!
131pub struct BlockingIoManager {
132 /// Poll instance to monitor I/O events from the OS.
133 /// This is only [`None`] when Miri is run with isolation enabled.
134 poll: Option<Poll>,
135 /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
136 /// This is not part of the state and only stored to avoid allocating a
137 /// new buffer for every poll.
138 events: Events,
139 /// Map from source file description ids to the actual sources and their
140 /// blocked threads.
141 sources: BTreeMap<FdId, BlockingIoSource>,
142}
143
144impl BlockingIoManager {
145 /// Create a new blocking I/O manager instance based on the availability
146 /// of communication with the host.
147 pub fn new(communicate: bool) -> Result<Self, io::Error> {
148 let manager = Self {
149 poll: communicate.then_some(Poll::new()?),
150 events: Events::with_capacity(IO_EVENT_CAPACITY),
151 sources: BTreeMap::default(),
152 };
153 Ok(manager)
154 }
155
156 /// Poll for new I/O events from the OS or wait until the timeout expired.
157 ///
158 /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
159 /// reads all events since the last poll.
160 /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
161 /// specified duration.
162 /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
163 ///
164 /// The events also immediately get processed: threads get unblocked, and epoll readiness gets updated.
165 pub fn poll<'tcx>(
166 ecx: &mut MiriInterpCx<'tcx>,
167 timeout: Option<Duration>,
168 ) -> InterpResult<'tcx, Result<(), io::Error>> {
169 let poll = ecx
170 .machine
171 .blocking_io
172 .poll
173 .as_mut()
174 .expect("Blocking I/O should not be called with isolation enabled");
175
176 // Poll for new I/O events from OS and store them in the events buffer.
177 if let Err(err) = poll.poll(&mut ecx.machine.blocking_io.events, timeout) {
178 return interp_ok(Err(err));
179 };
180
181 let event_fds = ecx
182 .machine
183 .blocking_io
184 .events
185 .iter()
186 .map(|event| {
187 let token = event.token();
188 // We know all tokens are valid `FdId`.
189 let fd_id = FdId::new_unchecked(token.0);
190 let source = ecx
191 .machine
192 .blocking_io
193 .sources
194 .get(&fd_id)
195 .expect("Source should be registered");
196 let fd = source.fd.upgrade().expect(
197 "Source file description shouldn't be destroyed whilst being registered",
198 );
199
200 assert_eq!(fd.id(), fd_id);
201 // Update the readiness of the source.
202 *fd.get_readiness_mut() |= BlockingIoSourceReadiness::from(event);
203 // Put FD into `event_fds` list.
204 fd
205 })
206 .collect::<Vec<_>>();
207
208 // Update the epoll readiness for all source file descriptions which received an event. Also,
209 // unblock the threads which are blocked on such a source and whose interests are now fulfilled.
210 for fd in event_fds.into_iter() {
211 // Update epoll readiness for the `fd` source.
212 ecx.update_epoll_active_events(fd.clone(), false)?;
213
214 let source =
215 ecx.machine.blocking_io.sources.get(&fd.id()).expect(
216 "Source file description shouldn't be destroyed whilst being registered",
217 );
218
219 // List of all thread id's whose interests are currently fulfilled
220 // and which are blocked on the `fd` source. This also includes
221 // threads whose interests were already fulfilled before the
222 // `poll` invocation.
223 let threads = source
224 .blocked_threads
225 .iter()
226 .filter_map(|(thread_id, interest)| {
227 fd.get_readiness_mut().fulfills_interest(interest).then_some(*thread_id)
228 })
229 .collect::<Vec<_>>();
230
231 // Unblock all threads whose interests are currently fulfilled and
232 // which are blocked on the `fd` source.
233 threads
234 .into_iter()
235 .try_for_each(|thread_id| ecx.unblock_thread(thread_id, BlockReason::IO))?;
236 }
237
238 interp_ok(Ok(()))
239 }
240
241 /// Return whether a source file description is currently registered in the
242 /// blocking I/O poll.
243 /// This can also be used to check whether a file description is a host
244 /// I/O source.
245 pub fn contains_source(&self, source_id: &FdId) -> bool {
246 self.sources.contains_key(source_id)
247 }
248
249 /// Register a source file description to the blocking I/O poll.
250 pub fn register(&mut self, source_fd: FileDescriptionRef<dyn SourceFileDescription>) {
251 let poll =
252 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
253
254 let id = source_fd.id();
255 let token = Token(id.to_usize());
256
257 // All possible interests.
258 // We only care about the readable and writable interests because those are the only
259 // interests which are available on all platforms. Internally, mio also
260 // registers an error interest.
261 let interest = Interest::READABLE | Interest::WRITABLE;
262
263 // Treat errors from registering as fatal. On UNIX hosts this can only
264 // fail due to system resource errors (e.g. ENOMEM or ENOSPC) or when the source is already registered.
265 source_fd
266 .with_source(&mut |source| poll.registry().register(source, token, interest))
267 .unwrap();
268
269 let source = BlockingIoSource {
270 fd: FileDescriptionRef::downgrade(&source_fd),
271 blocked_threads: BTreeMap::default(),
272 };
273
274 self.sources
275 .try_insert(id, source)
276 .unwrap_or_else(|_| panic!("Source should not already be registered"));
277 }
278
279 /// Deregister a source file description from the blocking I/O poll.
280 ///
281 /// It's assumed that the file description with id `source_id` is already
282 /// removed from the file description table.
283 pub fn deregister(&mut self, source_id: FdId, source: impl SourceFileDescription) {
284 let poll =
285 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
286
287 let stored_source = self.sources.remove(&source_id).expect("Source should be registered");
288 // Ensure that the source file description is already removed from the file
289 // description table.
290 assert!(
291 stored_source.fd.upgrade().is_none(),
292 "Sources must only be deregistered when they are destroyed"
293 );
294
295 // Because we only store `WeakFileDescriptionRef`s and the `stored_source` file description
296 // is already destroyed, the weak reference can no longer be upgraded. Thus, we cannot use
297 // it to deregister the source from the poll and instead use the `source` argument to deregister.
298
299 // Treat errors from deregistering as fatal. On UNIX hosts this can only
300 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
301 source.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
302 }
303
304 /// Add a new blocked thread to a registered source. The thread gets unblocked
305 /// once its [`BlockingIoInterest`] is fulfilled when calling
306 /// [`BlockingIoManager::poll`].
307 ///
308 /// It's assumed that the thread of `thread_id` isn't already blocked on
309 /// the source with id `source_id` and that this source is currently
310 /// registered.
311 fn add_blocked_thread(
312 &mut self,
313 source_id: FdId,
314 thread_id: ThreadId,
315 interest: BlockingIoInterest,
316 ) {
317 let source = self.sources.get_mut(&source_id).expect("Source should be registered");
318
319 source
320 .blocked_threads
321 .try_insert(thread_id, interest)
322 .expect("Thread cannot be blocked multiple times on the same source");
323 }
324
325 /// Remove a blocked thread from a registered source.
326 ///
327 /// It's assumed that the thread of `thread_id` is blocked on the
328 /// source with id `source_id` and that this source is currently
329 /// registered.
330 pub fn remove_blocked_thread(&mut self, source_id: FdId, thread_id: ThreadId) {
331 let source = self.sources.get_mut(&source_id).expect("Source should be registered");
332 source.blocked_threads.remove(&thread_id).expect("Thread should be blocked on source");
333 }
334}
335
336impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
337pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
338 /// Block the current thread until some interests on an I/O source
339 /// are fulfilled or the optional timeout exceeded.
340 /// The callback will be invoked when the thread gets unblocked.
341 ///
342 /// Note that an error interest is implicitly added to `interest`.
343 /// This means that the thread will also be unblocked when the error
344 /// readiness gets set for the source even when the requested interest
345 /// might not be fulfilled.
346 ///
347 /// There can also be spurious wake-ups by the OS and thus it's the callers
348 /// responsibility to verify that the requested I/O interests are
349 /// really ready and to block again if they're not.
350 ///
351 /// It's the callers responsibility to remove the [`BlockingIoInterest`]
352 /// from the blocking I/O manager in the provided callback function.
353 #[inline]
354 fn block_thread_for_io(
355 &mut self,
356 source_fd: FileDescriptionRef<dyn SourceFileDescription>,
357 interest: BlockingIoInterest,
358 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
359 callback: DynUnblockCallback<'tcx>,
360 ) -> InterpResult<'tcx> {
361 let this = self.eval_context_mut();
362
363 // We always have to do this since the thread will de-register itself.
364 this.machine.blocking_io.add_blocked_thread(source_fd.id(), this.active_thread(), interest);
365
366 if source_fd.get_readiness_mut().fulfills_interest(&interest) {
367 // The requested readiness is currently already fulfilled for the provided source.
368 // Instead of actually blocking the thread, we just run the callback function.
369 callback.call(this, UnblockKind::Ready)
370 } else {
371 // The I/O readiness is currently not fulfilled. We block the thread
372 // until the readiness is fulfilled and execute the callback then.
373 this.block_thread(BlockReason::IO, timeout, callback);
374 interp_ok(())
375 }
376 }
377}