miri/concurrency/blocking_io.rs
1use std::collections::BTreeMap;
2use std::io;
3use std::time::Duration;
4
5use mio::event::Source;
6use mio::{Events, Interest, Poll, Token};
7use rustc_data_structures::fx::FxHashMap;
8
9use crate::shims::{FdId, FileDescriptionRef};
10use crate::*;
11
12/// Capacity of the event queue which can be polled at a time.
13/// Since we don't expect many simultaneous blocking I/O events
14/// this value can be set rather low.
15const IO_EVENT_CAPACITY: usize = 16;
16
17/// Trait for values that contain a mio [`Source`].
18pub trait WithSource {
19 /// Invoke `f` on the source inside `self`.
20 fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
21}
22
23/// An interest receiver defines the action that should be taken when
24/// the associated [`Interest`] is fulfilled.
25#[derive(Debug, Hash, PartialEq, Clone, Copy, Eq, PartialOrd, Ord)]
26pub enum InterestReceiver {
27 /// The specified thread should be unblocked.
28 UnblockThread(ThreadId),
29}
30
31/// Manager for managing blocking host I/O in a non-blocking manner.
32/// We use [`Poll`] to poll for new I/O events from the OS for sources
33/// registered using this manager.
34///
35/// Since blocking host I/O is inherently non-deterministic, no method on this
36/// manager should be called when isolation is enabled. The only exception is
37/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
38/// we assert that isolation is disabled!
39pub struct BlockingIoManager {
40 /// Poll instance to monitor I/O events from the OS.
41 /// This is only [`None`] when Miri is run with isolation enabled.
42 poll: Option<Poll>,
43 /// Buffer used to store the ready I/O events when calling [`Poll::poll`].
44 /// This is not part of the state and only stored to avoid allocating a
45 /// new buffer for every poll.
46 events: Events,
47 /// Map from source ids to the actual sources and their registered receivers
48 /// together with their associated interests.
49 sources:
50 BTreeMap<FdId, (FileDescriptionRef<dyn WithSource>, FxHashMap<InterestReceiver, Interest>)>,
51}
52
53impl BlockingIoManager {
54 /// Create a new blocking I/O manager instance based on the availability
55 /// of communication with the host.
56 pub fn new(communicate: bool) -> Result<Self, io::Error> {
57 let manager = Self {
58 poll: communicate.then_some(Poll::new()?),
59 events: Events::with_capacity(IO_EVENT_CAPACITY),
60 sources: BTreeMap::default(),
61 };
62 Ok(manager)
63 }
64
65 /// Poll for new I/O events from the OS or wait until the timeout expired.
66 ///
67 /// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
68 /// reads all events since the last poll.
69 /// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
70 /// specified duration.
71 /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
72 ///
73 /// Returns the interest receivers for all file descriptions which received an I/O event together
74 /// with the file description they were registered for.
75 pub fn poll(
76 &mut self,
77 timeout: Option<Duration>,
78 ) -> Result<Vec<(InterestReceiver, FileDescriptionRef<dyn WithSource>)>, io::Error> {
79 let poll =
80 self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
81
82 // Poll for new I/O events from OS and store them in the events buffer.
83 poll.poll(&mut self.events, timeout)?;
84
85 let ready = self
86 .events
87 .iter()
88 .flat_map(|event| {
89 let token = event.token();
90 // We know all tokens are valid `FdId`.
91 let fd_id = FdId::new_unchecked(token.0);
92 let (source, interests) =
93 self.sources.get(&fd_id).expect("Source should be registered");
94 assert_eq!(source.id(), fd_id);
95 // Because we allow spurious wake-ups, we mark all interests as ready even
96 // though some may not have been fulfilled.
97 interests.keys().map(move |receiver| (*receiver, source.clone()))
98 })
99 .collect::<Vec<_>>();
100
101 // Deregister all ready sources as we only want to receive one event per receiver.
102 ready.iter().for_each(|(receiver, source)| self.deregister(source.id(), *receiver));
103
104 Ok(ready)
105 }
106
107 /// Register an interest for a blocking I/O source.
108 ///
109 /// As the OS can always produce spurious wake-ups, it's the callers responsibility to
110 /// verify the requested I/O interests are really ready and to register again if they're not.
111 ///
112 /// It's assumed that no interest is already registered for this source with the same reason!
113 pub fn register(
114 &mut self,
115 source_fd: FileDescriptionRef<dyn WithSource>,
116 receiver: InterestReceiver,
117 interest: Interest,
118 ) {
119 let poll =
120 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
121
122 let id = source_fd.id();
123 let token = Token(id.to_usize());
124
125 let Some((_, current_interests)) = self.sources.get_mut(&id) else {
126 // The source is not yet registered.
127
128 // Treat errors from registering as fatal. On UNIX hosts this can only
129 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
130 source_fd
131 .with_source(&mut |source| poll.registry().register(source, token, interest))
132 .unwrap();
133
134 self.sources.insert(id, (source_fd, FxHashMap::from_iter([(receiver, interest)])));
135 return;
136 };
137
138 // The source is already registered. We need to check whether we need to
139 // reregister because the provided interest contains new interests for the source.
140
141 let old_interest =
142 interest_union(current_interests).expect("Source should contain at least one interest");
143
144 current_interests
145 .try_insert(receiver, interest)
146 .unwrap_or_else(|_| panic!("Receiver should be unique"));
147
148 let new_interest = old_interest.add(interest);
149
150 // Reregister the source since the overall interests might have changed.
151
152 // Treat errors from reregistering as fatal. On UNIX hosts this can only
153 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
154 source_fd
155 .with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
156 .unwrap();
157 }
158
159 /// Deregister an interest from a blocking I/O source.
160 ///
161 /// The receiver is assumed to be registered for the provided source!
162 pub fn deregister(&mut self, source_id: FdId, receiver: InterestReceiver) {
163 let poll =
164 self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
165
166 let token = Token(source_id.to_usize());
167 let (fd, current_interests) =
168 self.sources.get_mut(&source_id).expect("Source should be registered");
169
170 current_interests
171 .remove(&receiver)
172 .unwrap_or_else(|| panic!("Receiver should be registered for source"));
173
174 let Some(new_interest) = interest_union(current_interests) else {
175 // There are no longer any interests in this source.
176 // We can thus deregister the source from the poll.
177
178 // Treat errors from deregistering as fatal. On UNIX hosts this can only
179 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
180 fd.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
181 self.sources.remove(&source_id);
182 return;
183 };
184
185 // Reregister the source since the overall interests might have changed.
186
187 // Treat errors from reregistering as fatal. On UNIX hosts this can only
188 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
189 fd.with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
190 .unwrap();
191 }
192}
193
194/// Get the union of all interests for a source. Returns `None` if the map is empty.
195fn interest_union(interests: &FxHashMap<InterestReceiver, Interest>) -> Option<Interest> {
196 interests
197 .values()
198 .copied()
199 .fold(None, |acc, interest| acc.map(|acc: Interest| acc.add(interest)).or(Some(interest)))
200}
201
202impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
203pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
204 /// Block the current thread until some interests on an I/O source
205 /// are fulfilled or the optional timeout exceeded.
206 /// The callback will be invoked when the thread gets unblocked.
207 ///
208 /// There can be spurious wake-ups by the OS and thus it's the callers
209 /// responsibility to verify that the requested I/O interests are
210 /// really ready and to block again if they're not.
211 #[inline]
212 fn block_thread_for_io(
213 &mut self,
214 source_fd: FileDescriptionRef<dyn WithSource>,
215 interests: Interest,
216 timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
217 callback: DynUnblockCallback<'tcx>,
218 ) {
219 let this = self.eval_context_mut();
220 this.machine.blocking_io.register(
221 source_fd,
222 InterestReceiver::UnblockThread(this.machine.threads.active_thread()),
223 interests,
224 );
225 this.block_thread(BlockReason::IO, timeout, callback);
226 }
227}