Skip to main content

miri/concurrency/
blocking_io.rs

1use std::collections::BTreeMap;
2use std::io;
3use std::time::Duration;
4
5use mio::event::Source;
6use mio::{Events, Interest, Poll, Token};
7use rustc_data_structures::fx::FxHashMap;
8
9use crate::shims::{FdId, FileDescriptionRef};
10use crate::*;
11
12/// Capacity of the event queue which can be polled at a time.
13/// Since we don't expect many simultaneous blocking I/O events
14/// this value can be set rather low.
15const IO_EVENT_CAPACITY: usize = 16;
16
17/// Trait for values that contain a mio [`Source`].
18pub trait WithSource {
19    /// Invoke `f` on the source inside `self`.
20    fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
21}
22
23/// An interest receiver defines the action that should be taken when
24/// the associated [`Interest`] is fulfilled.
25#[derive(Debug, Hash, PartialEq, Clone, Copy, Eq, PartialOrd, Ord)]
26pub enum InterestReceiver {
27    /// The specified thread should be unblocked.
28    UnblockThread(ThreadId),
29}
30
31/// Manager for managing blocking host I/O in a non-blocking manner.
32/// We use [`Poll`] to poll for new I/O events from the OS for sources
33/// registered using this manager.
34///
35/// Since blocking host I/O is inherently non-deterministic, no method on this
36/// manager should be called when isolation is enabled. The only exception is
37/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
38/// we assert that isolation is disabled!
39pub struct BlockingIoManager {
40    /// Poll instance to monitor I/O events from the OS.
41    /// This is only [`None`] when Miri is run with isolation enabled.
42    poll: Option<Poll>,
43    /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
44    /// This is not part of the state and only stored to avoid allocating a
45    /// new buffer for every poll.
46    events: Events,
47    /// Map from source ids to the actual sources and their registered receivers
48    /// together with their associated interests.
49    sources:
50        BTreeMap<FdId, (FileDescriptionRef<dyn WithSource>, FxHashMap<InterestReceiver, Interest>)>,
51}
52
53impl BlockingIoManager {
54    /// Create a new blocking I/O manager instance based on the availability
55    /// of communication with the host.
56    pub fn new(communicate: bool) -> Result<Self, io::Error> {
57        let manager = Self {
58            poll: communicate.then_some(Poll::new()?),
59            events: Events::with_capacity(IO_EVENT_CAPACITY),
60            sources: BTreeMap::default(),
61        };
62        Ok(manager)
63    }
64
65    /// Poll for new I/O events from the OS or wait until the timeout expired.
66    ///
67    /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
68    ///   reads all events since the last poll.
69    /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
70    ///   specified duration.
71    /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
72    ///
73    /// Returns the interest receivers for all file descriptions which received an I/O event together
74    /// with the file description they were registered for.
75    pub fn poll(
76        &mut self,
77        timeout: Option<Duration>,
78    ) -> Result<Vec<(InterestReceiver, FileDescriptionRef<dyn WithSource>)>, io::Error> {
79        let poll =
80            self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
81
82        // Poll for new I/O events from OS and store them in the events buffer.
83        poll.poll(&mut self.events, timeout)?;
84
85        let ready = self
86            .events
87            .iter()
88            .flat_map(|event| {
89                let token = event.token();
90                // We know all tokens are valid `FdId`.
91                let fd_id = FdId::new_unchecked(token.0);
92                let (source, interests) =
93                    self.sources.get(&fd_id).expect("Source should be registered");
94                assert_eq!(source.id(), fd_id);
95                // Because we allow spurious wake-ups, we mark all interests as ready even
96                // though some may not have been fulfilled.
97                interests.keys().map(move |receiver| (*receiver, source.clone()))
98            })
99            .collect::<Vec<_>>();
100
101        // Deregister all ready sources as we only want to receive one event per receiver.
102        ready.iter().for_each(|(receiver, source)| self.deregister(source.id(), *receiver));
103
104        Ok(ready)
105    }
106
107    /// Register an interest for a blocking I/O source.
108    ///
109    /// As the OS can always produce spurious wake-ups, it's the callers responsibility to
110    /// verify the requested I/O interests are really ready and to register again if they're not.
111    ///
112    /// It's assumed that no interest is already registered for this source with the same reason!
113    pub fn register(
114        &mut self,
115        source_fd: FileDescriptionRef<dyn WithSource>,
116        receiver: InterestReceiver,
117        interest: Interest,
118    ) {
119        let poll =
120            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
121
122        let id = source_fd.id();
123        let token = Token(id.to_usize());
124
125        let Some((_, current_interests)) = self.sources.get_mut(&id) else {
126            // The source is not yet registered.
127
128            // Treat errors from registering as fatal. On UNIX hosts this can only
129            // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
130            source_fd
131                .with_source(&mut |source| poll.registry().register(source, token, interest))
132                .unwrap();
133
134            self.sources.insert(id, (source_fd, FxHashMap::from_iter([(receiver, interest)])));
135            return;
136        };
137
138        // The source is already registered. We need to check whether we need to
139        // reregister because the provided interest contains new interests for the source.
140
141        let old_interest =
142            interest_union(current_interests).expect("Source should contain at least one interest");
143
144        current_interests
145            .try_insert(receiver, interest)
146            .unwrap_or_else(|_| panic!("Receiver should be unique"));
147
148        let new_interest = old_interest.add(interest);
149
150        // Reregister the source since the overall interests might have changed.
151
152        // Treat errors from reregistering as fatal. On UNIX hosts this can only
153        // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
154        source_fd
155            .with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
156            .unwrap();
157    }
158
159    /// Deregister an interest from a blocking I/O source.
160    ///
161    /// The receiver is assumed to be registered for the provided source!
162    pub fn deregister(&mut self, source_id: FdId, receiver: InterestReceiver) {
163        let poll =
164            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
165
166        let token = Token(source_id.to_usize());
167        let (fd, current_interests) =
168            self.sources.get_mut(&source_id).expect("Source should be registered");
169
170        current_interests
171            .remove(&receiver)
172            .unwrap_or_else(|| panic!("Receiver should be registered for source"));
173
174        let Some(new_interest) = interest_union(current_interests) else {
175            // There are no longer any interests in this source.
176            // We can thus deregister the source from the poll.
177
178            // Treat errors from deregistering as fatal. On UNIX hosts this can only
179            // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
180            fd.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
181            self.sources.remove(&source_id);
182            return;
183        };
184
185        // Reregister the source since the overall interests might have changed.
186
187        // Treat errors from reregistering as fatal. On UNIX hosts this can only
188        // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
189        fd.with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
190            .unwrap();
191    }
192}
193
194/// Get the union of all interests for a source. Returns `None` if the map is empty.
195fn interest_union(interests: &FxHashMap<InterestReceiver, Interest>) -> Option<Interest> {
196    interests
197        .values()
198        .copied()
199        .fold(None, |acc, interest| acc.map(|acc: Interest| acc.add(interest)).or(Some(interest)))
200}
201
202impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
203pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
204    /// Block the current thread until some interests on an I/O source
205    /// are fulfilled or the optional timeout exceeded.
206    /// The callback will be invoked when the thread gets unblocked.
207    ///
208    /// There can be spurious wake-ups by the OS and thus it's the callers
209    /// responsibility to verify that the requested I/O interests are
210    /// really ready and to block again if they're not.
211    #[inline]
212    fn block_thread_for_io(
213        &mut self,
214        source_fd: FileDescriptionRef<dyn WithSource>,
215        interests: Interest,
216        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
217        callback: DynUnblockCallback<'tcx>,
218    ) {
219        let this = self.eval_context_mut();
220        this.machine.blocking_io.register(
221            source_fd,
222            InterestReceiver::UnblockThread(this.machine.threads.active_thread()),
223            interests,
224        );
225        this.block_thread(BlockReason::IO, timeout, callback);
226    }
227}