Skip to main content

miri/concurrency/
blocking_io.rs

1use std::io;
2use std::time::Duration;
3
4use mio::event::Source;
5use mio::{Events, Interest, Poll, Token};
6use rustc_data_structures::fx::FxHashMap;
7
8use crate::*;
9
10/// Capacity of the event queue which can be polled at a time.
11/// Since we don't expect many simultaneous blocking I/O events
12/// this value can be set rather low.
13const IO_EVENT_CAPACITY: usize = 16;
14
15/// Trait for values that contain a mio [`Source`].
16pub trait WithSource {
17    /// Invoke `f` on the source inside `self`.
18    fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
19}
20
21/// Manager for managing blocking host I/O in a non-blocking manner.
22/// We use [`Poll`] to poll for new I/O events from the OS for sources
23/// registered using this manager.
24///
25/// Since blocking host I/O is inherently non-deterministic, no method on this
26/// manager should be called when isolation is enabled. The only exception is
27/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
28/// we assert that isolation is disabled!
29pub struct BlockingIoManager {
30    /// Poll instance to monitor I/O events from the OS.
31    /// This is only [`None`] when Miri is run with isolation enabled.
32    poll: Option<Poll>,
33    /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
34    /// This is not part of the state and only stored to avoid allocating a
35    /// new buffer for every poll.
36    events: Events,
37    /// Map between threads which are currently blocked and the
38    /// underlying I/O source.
39    sources: FxHashMap<ThreadId, Box<dyn WithSource>>,
40}
41
42impl BlockingIoManager {
43    /// Create a new blocking I/O manager instance based on the availability
44    /// of communication with the host.
45    pub fn new(communicate: bool) -> Result<Self, io::Error> {
46        let manager = Self {
47            poll: communicate.then_some(Poll::new()?),
48            events: Events::with_capacity(IO_EVENT_CAPACITY),
49            sources: FxHashMap::default(),
50        };
51        Ok(manager)
52    }
53
54    /// Poll for new I/O events from the OS or wait until the timeout expired.
55    ///
56    /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
57    ///   reads all events since the last poll.
58    /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
59    ///   specified duration.
60    /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
61    ///
62    /// Returns all threads that are ready because they received an I/O event.
63    pub fn poll(&mut self, timeout: Option<Duration>) -> Result<Vec<ThreadId>, io::Error> {
64        let poll =
65            self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
66
67        // Poll for new I/O events from OS and store them in the events buffer.
68        poll.poll(&mut self.events, timeout)?;
69
70        let ready = self
71            .events
72            .iter()
73            .map(|event| {
74                let token = event.token();
75                ThreadId::new_unchecked(token.0.try_into().unwrap())
76            })
77            .collect::<Vec<_>>();
78
79        // Deregister all ready sources as we only want to receive one event per thread.
80        ready.iter().for_each(|thread_id| self.deregister(*thread_id));
81
82        Ok(ready)
83    }
84
85    /// Register a blocking I/O source for a thread together with it's poll interests.
86    ///
87    /// The source will be deregistered automatically once an event for it is received.
88    ///
89    /// As the OS can always produce spurious wake-ups, it's the callers responsibility to
90    /// verify the requested I/O interests are really ready and to register again if they're not.
91    pub fn register(&mut self, source: Box<dyn WithSource>, thread: ThreadId, interests: Interest) {
92        let poll =
93            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
94
95        let token = Token(thread.to_u32().to_usize());
96
97        // Treat errors from registering as fatal. On UNIX hosts this can only
98        // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
99        source
100            .with_source(&mut |source| source.register(poll.registry(), token, interests))
101            .unwrap();
102        self.sources
103            .try_insert(thread, source)
104            .unwrap_or_else(|_| panic!("A thread cannot be registered twice at the same time"));
105    }
106
107    /// Deregister the event source for a thread. Returns the kind of I/O the thread was
108    /// blocked on.
109    fn deregister(&mut self, thread: ThreadId) {
110        let poll =
111            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
112
113        let Some(source) = self.sources.remove(&thread) else {
114            panic!("Attempt to deregister a token which isn't registered")
115        };
116
117        // Treat errors from deregistering as fatal. On UNIX hosts this can only
118        // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
119        source.with_source(&mut |source| source.deregister(poll.registry())).unwrap();
120    }
121}
122
123impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
124pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
125    /// Block the current thread until some interests on an I/O source
126    /// are fulfilled or the optional timeout exceeded.
127    /// The callback will be invoked when the thread gets unblocked.
128    ///
129    /// There can be spurious wake-ups by the OS and thus it's the callers
130    /// responsibility to verify that the requested I/O interests are
131    /// really ready and to block again if they're not.
132    #[inline]
133    fn block_thread_for_io(
134        &mut self,
135        source: impl WithSource + 'static,
136        interests: Interest,
137        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
138        callback: DynUnblockCallback<'tcx>,
139    ) {
140        let this = self.eval_context_mut();
141        this.machine.blocking_io.register(
142            Box::new(source),
143            this.machine.threads.active_thread(),
144            interests,
145        );
146        this.block_thread(BlockReason::IO, timeout, callback);
147    }
148}