Skip to main content

miri/concurrency/
blocking_io.rs

1use std::cell::RefMut;
2use std::collections::BTreeMap;
3use std::io;
4use std::ops::BitOrAssign;
5use std::time::Duration;
6
7use mio::event::Source;
8use mio::{Events, Interest, Poll, Token};
9
10use crate::shims::{
11    EpollEvalContextExt, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
12};
13use crate::*;
14
15/// Capacity of the event queue which can be polled at a time.
16/// Since we don't expect many simultaneous blocking I/O events
17/// this value can be set rather low.
18const IO_EVENT_CAPACITY: usize = 16;
19
20/// Trait for file descriptions that contain a mio [`Source`].
21pub trait SourceFileDescription: FileDescription {
22    /// Invoke `f` on the source inside `self`.
23    fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
24
25    /// Get a mutable reference to the readiness of the source.
26    fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness>;
27}
28
29/// An I/O interest for a blocked thread. Note that all threads are always considered
30/// to be interested in "error" events.
31#[derive(Debug, Clone, Copy)]
32pub enum BlockingIoInterest {
33    /// The blocked thread is interested in [`Interest::READABLE`].
34    Read,
35    /// The blocked thread is interested in [`Interest::WRITABLE`].
36    Write,
37    /// The blocked thread is interested in [`Interest::READABLE`] and
38    /// [`Interest::WRITABLE`].
39    ReadWrite,
40}
41
42/// Struct reflecting the readiness of a source file description.
43#[derive(Debug)]
44pub struct BlockingIoSourceReadiness {
45    /// Boolean whether the source is currently readable.
46    pub readable: bool,
47    /// Boolean whether the source is currently writable.
48    pub writable: bool,
49    /// Boolean whether the read end of the source has been
50    /// closed.
51    pub read_closed: bool,
52    /// Boolean whether the write end of the source has been
53    /// closed.
54    pub write_closed: bool,
55    /// Boolean whether the source currently has an error.
56    pub error: bool,
57}
58
59impl BlockingIoSourceReadiness {
60    pub fn empty() -> Self {
61        Self {
62            readable: false,
63            writable: false,
64            read_closed: false,
65            write_closed: false,
66            error: false,
67        }
68    }
69
70    /// Check whether the current readiness fulfills the blocking I/O interest of
71    /// `interest`.
72    /// This function also returns `true` if the error readiness is set
73    /// even when the requested interest might not be fulfilled.
74    fn fulfills_interest(&self, interest: &BlockingIoInterest) -> bool {
75        match interest {
76            BlockingIoInterest::Read => self.readable || self.error,
77            BlockingIoInterest::Write => self.writable || self.error,
78            BlockingIoInterest::ReadWrite => self.readable || self.writable || self.error,
79        }
80    }
81}
82
83impl BitOrAssign for BlockingIoSourceReadiness {
84    fn bitor_assign(&mut self, rhs: Self) {
85        self.readable |= rhs.readable;
86        self.writable |= rhs.writable;
87        self.read_closed |= rhs.read_closed;
88        self.write_closed |= rhs.write_closed;
89        self.error |= rhs.error;
90    }
91}
92
93impl From<&mio::event::Event> for BlockingIoSourceReadiness {
94    fn from(event: &mio::event::Event) -> Self {
95        Self {
96            readable: event.is_readable(),
97            writable: event.is_writable(),
98            read_closed: event.is_read_closed(),
99            write_closed: event.is_write_closed(),
100            error: event.is_error(),
101        }
102    }
103}
104
105struct BlockingIoSource {
106    /// The source file description which is registered into the poll.
107    /// We only store weak references such that source file descriptions
108    /// can be destroyed whilst they are registered. However, they are required
109    /// to deregister themselves when [`FileDescription::destroy`] is called.
110    fd: WeakFileDescriptionRef<dyn SourceFileDescription>,
111    /// The threads which are blocked on the I/O source, and the interest indicating
112    /// when they should be unblocked.
113    blocked_threads: BTreeMap<ThreadId, BlockingIoInterest>,
114}
115
116/// Manager for managing blocking host I/O in a non-blocking manner.
117/// We use [`Poll`] to poll for new I/O events from the OS for sources
118/// registered using this manager.
119///
120/// The semantics of this manager are that host I/O sources are registered
121/// to a [`Poll`] for their entire lifespan. Once host readiness events happen
122/// on a registered source, its internal epoll readiness gets updated -- even
123/// when the source isn't part of an active epoll instance. Also, for the entire
124/// lifespan of the source, threads can be added which should be unblocked
125/// once a certain [`BlockingIoSourceReadiness`] for an I/O source is satisfied.
126///
127/// Since blocking host I/O is inherently non-deterministic, no method on this
128/// manager should be called when isolation is enabled. The only exception is
129/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
130/// we assert that isolation is disabled!
131pub struct BlockingIoManager {
132    /// Poll instance to monitor I/O events from the OS.
133    /// This is only [`None`] when Miri is run with isolation enabled.
134    poll: Option<Poll>,
135    /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
136    /// This is not part of the state and only stored to avoid allocating a
137    /// new buffer for every poll.
138    events: Events,
139    /// Map from source file description ids to the actual sources and their
140    /// blocked threads.
141    sources: BTreeMap<FdId, BlockingIoSource>,
142}
143
144impl BlockingIoManager {
145    /// Create a new blocking I/O manager instance based on the availability
146    /// of communication with the host.
147    pub fn new(communicate: bool) -> Result<Self, io::Error> {
148        let manager = Self {
149            poll: communicate.then_some(Poll::new()?),
150            events: Events::with_capacity(IO_EVENT_CAPACITY),
151            sources: BTreeMap::default(),
152        };
153        Ok(manager)
154    }
155
156    /// Poll for new I/O events from the OS or wait until the timeout expired.
157    ///
158    /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
159    ///   reads all events since the last poll.
160    /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
161    ///   specified duration.
162    /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
163    ///
164    /// The events also immediately get processed: threads get unblocked, and epoll readiness gets updated.
165    pub fn poll<'tcx>(
166        ecx: &mut MiriInterpCx<'tcx>,
167        timeout: Option<Duration>,
168    ) -> InterpResult<'tcx, Result<(), io::Error>> {
169        let poll = ecx
170            .machine
171            .blocking_io
172            .poll
173            .as_mut()
174            .expect("Blocking I/O should not be called with isolation enabled");
175
176        // Poll for new I/O events from OS and store them in the events buffer.
177        if let Err(err) = poll.poll(&mut ecx.machine.blocking_io.events, timeout) {
178            return interp_ok(Err(err));
179        };
180
181        let event_fds = ecx
182            .machine
183            .blocking_io
184            .events
185            .iter()
186            .map(|event| {
187                let token = event.token();
188                // We know all tokens are valid `FdId`.
189                let fd_id = FdId::new_unchecked(token.0);
190                let source = ecx
191                    .machine
192                    .blocking_io
193                    .sources
194                    .get(&fd_id)
195                    .expect("Source should be registered");
196                let fd = source.fd.upgrade().expect(
197                    "Source file description shouldn't be destroyed whilst being registered",
198                );
199
200                assert_eq!(fd.id(), fd_id);
201                // Update the readiness of the source.
202                *fd.get_readiness_mut() |= BlockingIoSourceReadiness::from(event);
203                // Put FD into `event_fds` list.
204                fd
205            })
206            .collect::<Vec<_>>();
207
208        // Update the epoll readiness for all source file descriptions which received an event. Also,
209        // unblock the threads which are blocked on such a source and whose interests are now fulfilled.
210        for fd in event_fds.into_iter() {
211            // Update epoll readiness for the `fd` source.
212            ecx.update_epoll_active_events(fd.clone(), false)?;
213
214            let source =
215                ecx.machine.blocking_io.sources.get(&fd.id()).expect(
216                    "Source file description shouldn't be destroyed whilst being registered",
217                );
218
219            // List of all thread id's whose interests are currently fulfilled
220            // and which are blocked on the `fd` source. This also includes
221            // threads whose interests were already fulfilled before the
222            // `poll` invocation.
223            let threads = source
224                .blocked_threads
225                .iter()
226                .filter_map(|(thread_id, interest)| {
227                    fd.get_readiness_mut().fulfills_interest(interest).then_some(*thread_id)
228                })
229                .collect::<Vec<_>>();
230
231            // Unblock all threads whose interests are currently fulfilled and
232            // which are blocked on the `fd` source.
233            threads
234                .into_iter()
235                .try_for_each(|thread_id| ecx.unblock_thread(thread_id, BlockReason::IO))?;
236        }
237
238        interp_ok(Ok(()))
239    }
240
241    /// Return whether a source file description is currently registered in the
242    /// blocking I/O poll.
243    /// This can also be used to check whether a file description is a host
244    /// I/O source.
245    pub fn contains_source(&self, source_id: &FdId) -> bool {
246        self.sources.contains_key(source_id)
247    }
248
249    /// Register a source file description to the blocking I/O poll.
250    pub fn register(&mut self, source_fd: FileDescriptionRef<dyn SourceFileDescription>) {
251        let poll =
252            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
253
254        let id = source_fd.id();
255        let token = Token(id.to_usize());
256
257        // All possible interests.
258        // We only care about the readable and writable interests because those are the only
259        // interests which are available on all platforms. Internally, mio also
260        // registers an error interest.
261        let interest = Interest::READABLE | Interest::WRITABLE;
262
263        // Treat errors from registering as fatal. On UNIX hosts this can only
264        // fail due to system resource errors (e.g. ENOMEM or ENOSPC) or when the source is already registered.
265        source_fd
266            .with_source(&mut |source| poll.registry().register(source, token, interest))
267            .unwrap();
268
269        let source = BlockingIoSource {
270            fd: FileDescriptionRef::downgrade(&source_fd),
271            blocked_threads: BTreeMap::default(),
272        };
273
274        self.sources
275            .try_insert(id, source)
276            .unwrap_or_else(|_| panic!("Source should not already be registered"));
277    }
278
279    /// Deregister a source file description from the blocking I/O poll.
280    ///
281    /// It's assumed that the file description with id `source_id` is already
282    /// removed from the file description table.
283    pub fn deregister(&mut self, source_id: FdId, source: impl SourceFileDescription) {
284        let poll =
285            self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
286
287        let stored_source = self.sources.remove(&source_id).expect("Source should be registered");
288        // Ensure that the source file description is already removed from the file
289        // description table.
290        assert!(
291            stored_source.fd.upgrade().is_none(),
292            "Sources must only be deregistered when they are destroyed"
293        );
294
295        // Because we only store `WeakFileDescriptionRef`s and the `stored_source` file description
296        // is already destroyed, the weak reference can no longer be upgraded. Thus, we cannot use
297        // it to deregister the source from the poll and instead use the `source` argument to deregister.
298
299        // Treat errors from deregistering as fatal. On UNIX hosts this can only
300        // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
301        source.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
302    }
303
304    /// Add a new blocked thread to a registered source. The thread gets unblocked
305    /// once its [`BlockingIoInterest`] is fulfilled when calling
306    /// [`BlockingIoManager::poll`].
307    ///
308    /// It's assumed that the thread of `thread_id` isn't already blocked on
309    /// the source with id `source_id` and that this source is currently
310    /// registered.
311    fn add_blocked_thread(
312        &mut self,
313        source_id: FdId,
314        thread_id: ThreadId,
315        interest: BlockingIoInterest,
316    ) {
317        let source = self.sources.get_mut(&source_id).expect("Source should be registered");
318
319        source
320            .blocked_threads
321            .try_insert(thread_id, interest)
322            .expect("Thread cannot be blocked multiple times on the same source");
323    }
324
325    /// Remove a blocked thread from a registered source.
326    ///
327    /// It's assumed that the thread of `thread_id` is blocked on the
328    /// source with id `source_id` and that this source is currently
329    /// registered.
330    pub fn remove_blocked_thread(&mut self, source_id: FdId, thread_id: ThreadId) {
331        let source = self.sources.get_mut(&source_id).expect("Source should be registered");
332        source.blocked_threads.remove(&thread_id).expect("Thread should be blocked on source");
333    }
334}
335
336impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
337pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
338    /// Block the current thread until some interests on an I/O source
339    /// are fulfilled or the optional timeout exceeded.
340    /// The callback will be invoked when the thread gets unblocked.
341    ///
342    /// Note that an error interest is implicitly added to `interest`.
343    /// This means that the thread will also be unblocked when the error
344    /// readiness gets set for the source even when the requested interest
345    /// might not be fulfilled.
346    ///
347    /// There can also be spurious wake-ups by the OS and thus it's the callers
348    /// responsibility to verify that the requested I/O interests are
349    /// really ready and to block again if they're not.
350    ///
351    /// It's the callers responsibility to remove the [`BlockingIoInterest`]
352    /// from the blocking I/O manager in the provided callback function.
353    #[inline]
354    fn block_thread_for_io(
355        &mut self,
356        source_fd: FileDescriptionRef<dyn SourceFileDescription>,
357        interest: BlockingIoInterest,
358        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
359        callback: DynUnblockCallback<'tcx>,
360    ) -> InterpResult<'tcx> {
361        let this = self.eval_context_mut();
362
363        // We always have to do this since the thread will de-register itself.
364        this.machine.blocking_io.add_blocked_thread(source_fd.id(), this.active_thread(), interest);
365
366        if source_fd.get_readiness_mut().fulfills_interest(&interest) {
367            // The requested readiness is currently already fulfilled for the provided source.
368            // Instead of actually blocking the thread, we just run the callback function.
369            callback.call(this, UnblockKind::Ready)
370        } else {
371            // The I/O readiness is currently not fulfilled. We block the thread
372            // until the readiness is fulfilled and execute the callback then.
373            this.block_thread(BlockReason::IO, timeout, callback);
374            interp_ok(())
375        }
376    }
377}