miri/shims/unix/
unnamed_socket.rs

1//! This implements "anonymous" sockets, that do not correspond to anything on the host system and
2//! are entirely implemented inside Miri.
3//! We also use the same infrastructure to implement unnamed pipes.
4
5use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use crate::concurrency::VClock;
11use crate::shims::files::{
12    EvalContextExt as _, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
13};
14use crate::shims::unix::UnixFileDescription;
15use crate::shims::unix::linux_like::epoll::{EpollReadyEvents, EvalContextExt as _};
16use crate::*;
17
18/// The maximum capacity of the socketpair buffer in bytes.
19/// This number is arbitrary as the value can always
20/// be configured in the real system.
21const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
22
23/// One end of a pair of connected unnamed sockets.
24#[derive(Debug)]
25struct AnonSocket {
26    /// The buffer we are reading from, or `None` if this is the writing end of a pipe.
27    /// (In that case, the peer FD will be the reading end of that pipe.)
28    readbuf: Option<RefCell<Buffer>>,
29    /// The `AnonSocket` file descriptor that is our "peer", and that holds the buffer we are
30    /// writing to. This is a weak reference because the other side may be closed before us; all
31    /// future writes will then trigger EPIPE.
32    peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
33    /// Indicates whether the peer has lost data when the file description is closed.
34    /// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
35    /// of closure.
36    peer_lost_data: Cell<bool>,
37    /// A list of thread ids blocked because the buffer was empty.
38    /// Once another thread writes some bytes, these threads will be unblocked.
39    blocked_read_tid: RefCell<Vec<ThreadId>>,
40    /// A list of thread ids blocked because the buffer was full.
41    /// Once another thread reads some bytes, these threads will be unblocked.
42    blocked_write_tid: RefCell<Vec<ThreadId>>,
43    is_nonblock: bool,
44}
45
46#[derive(Debug)]
47struct Buffer {
48    buf: VecDeque<u8>,
49    clock: VClock,
50}
51
52impl Buffer {
53    fn new() -> Self {
54        Buffer { buf: VecDeque::new(), clock: VClock::default() }
55    }
56}
57
58impl AnonSocket {
59    fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
60        self.peer_fd.get().unwrap()
61    }
62}
63
64impl FileDescription for AnonSocket {
65    fn name(&self) -> &'static str {
66        "socketpair"
67    }
68
69    fn close<'tcx>(
70        self,
71        _communicate_allowed: bool,
72        ecx: &mut MiriInterpCx<'tcx>,
73    ) -> InterpResult<'tcx, io::Result<()>> {
74        if let Some(peer_fd) = self.peer_fd().upgrade() {
75            // If the current readbuf is non-empty when the file description is closed,
76            // notify the peer that data lost has happened in current file description.
77            if let Some(readbuf) = &self.readbuf {
78                if !readbuf.borrow().buf.is_empty() {
79                    peer_fd.peer_lost_data.set(true);
80                }
81            }
82            // Notify peer fd that close has happened, since that can unblock reads and writes.
83            ecx.check_and_update_readiness(peer_fd)?;
84        }
85        interp_ok(Ok(()))
86    }
87
88    fn read<'tcx>(
89        self: FileDescriptionRef<Self>,
90        _communicate_allowed: bool,
91        ptr: Pointer,
92        len: usize,
93        ecx: &mut MiriInterpCx<'tcx>,
94        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
95    ) -> InterpResult<'tcx> {
96        anonsocket_read(self, ptr, len, ecx, finish)
97    }
98
99    fn write<'tcx>(
100        self: FileDescriptionRef<Self>,
101        _communicate_allowed: bool,
102        ptr: Pointer,
103        len: usize,
104        ecx: &mut MiriInterpCx<'tcx>,
105        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
106    ) -> InterpResult<'tcx> {
107        anonsocket_write(self, ptr, len, ecx, finish)
108    }
109
110    fn as_unix(&self) -> &dyn UnixFileDescription {
111        self
112    }
113}
114
115/// Write to AnonSocket based on the space available and return the written byte size.
116fn anonsocket_write<'tcx>(
117    self_ref: FileDescriptionRef<AnonSocket>,
118    ptr: Pointer,
119    len: usize,
120    ecx: &mut MiriInterpCx<'tcx>,
121    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
122) -> InterpResult<'tcx> {
123    // Always succeed on write size 0.
124    // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
125    if len == 0 {
126        return finish.call(ecx, Ok(0));
127    }
128
129    // We are writing to our peer's readbuf.
130    let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
131        // If the upgrade from Weak to Rc fails, it indicates that all read ends have been
132        // closed. It is an error to write even if there would be space.
133        return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
134    };
135
136    let Some(writebuf) = &peer_fd.readbuf else {
137        // Writing to the read end of a pipe.
138        return finish.call(ecx, Err(IoError::LibcError("EBADF")));
139    };
140
141    // Let's see if we can write.
142    let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
143    if available_space == 0 {
144        if self_ref.is_nonblock {
145            // Non-blocking socketpair with a full buffer.
146            return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
147        } else {
148            self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
149            // Blocking socketpair with a full buffer.
150            // Block the current thread; only keep a weak ref for this.
151            let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
152            ecx.block_thread(
153                BlockReason::UnnamedSocket,
154                None,
155                callback!(
156                    @capture<'tcx> {
157                        weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
158                        ptr: Pointer,
159                        len: usize,
160                        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
161                    }
162                    |this, unblock: UnblockKind| {
163                        assert_eq!(unblock, UnblockKind::Ready);
164                        // If we got unblocked, then our peer successfully upgraded its weak
165                        // ref to us. That means we can also upgrade our weak ref.
166                        let self_ref = weak_self_ref.upgrade().unwrap();
167                        anonsocket_write(self_ref, ptr, len, this, finish)
168                    }
169                ),
170            );
171        }
172    } else {
173        // There is space to write!
174        let mut writebuf = writebuf.borrow_mut();
175        // Remember this clock so `read` can synchronize with us.
176        ecx.release_clock(|clock| {
177            writebuf.clock.join(clock);
178        });
179        // Do full write / partial write based on the space available.
180        let write_size = len.min(available_space);
181        let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
182        assert_eq!(actual_write_size, write_size);
183
184        // Need to stop accessing peer_fd so that it can be notified.
185        drop(writebuf);
186
187        // Unblock all threads that are currently blocked on peer_fd's read.
188        let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
189        // FIXME: We can randomize the order of unblocking.
190        for thread_id in waiting_threads {
191            ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
192        }
193        // Notification should be provided for peer fd as it became readable.
194        // The kernel does this even if the fd was already readable before, so we follow suit.
195        ecx.check_and_update_readiness(peer_fd)?;
196
197        return finish.call(ecx, Ok(write_size));
198    }
199    interp_ok(())
200}
201
202/// Read from AnonSocket and return the number of bytes read.
203fn anonsocket_read<'tcx>(
204    self_ref: FileDescriptionRef<AnonSocket>,
205    ptr: Pointer,
206    len: usize,
207    ecx: &mut MiriInterpCx<'tcx>,
208    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
209) -> InterpResult<'tcx> {
210    // Always succeed on read size 0.
211    if len == 0 {
212        return finish.call(ecx, Ok(0));
213    }
214
215    let Some(readbuf) = &self_ref.readbuf else {
216        // FIXME: This should return EBADF, but there's no nice way to do that as there's no
217        // corresponding ErrorKind variant.
218        throw_unsup_format!("reading from the write end of a pipe")
219    };
220
221    if readbuf.borrow_mut().buf.is_empty() {
222        if self_ref.peer_fd().upgrade().is_none() {
223            // Socketpair with no peer and empty buffer.
224            // 0 bytes successfully read indicates end-of-file.
225            return finish.call(ecx, Ok(0));
226        } else if self_ref.is_nonblock {
227            // Non-blocking socketpair with writer and empty buffer.
228            // https://linux.die.net/man/2/read
229            // EAGAIN or EWOULDBLOCK can be returned for socket,
230            // POSIX.1-2001 allows either error to be returned for this case.
231            // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
232            return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
233        } else {
234            self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
235            // Blocking socketpair with writer and empty buffer.
236            // Block the current thread; only keep a weak ref for this.
237            let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
238            ecx.block_thread(
239                BlockReason::UnnamedSocket,
240                None,
241                callback!(
242                    @capture<'tcx> {
243                        weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
244                        ptr: Pointer,
245                        len: usize,
246                        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
247                    }
248                    |this, unblock: UnblockKind| {
249                        assert_eq!(unblock, UnblockKind::Ready);
250                        // If we got unblocked, then our peer successfully upgraded its weak
251                        // ref to us. That means we can also upgrade our weak ref.
252                        let self_ref = weak_self_ref.upgrade().unwrap();
253                        anonsocket_read(self_ref, ptr, len, this, finish)
254                    }
255                ),
256            );
257        }
258    } else {
259        // There's data to be read!
260        let mut readbuf = readbuf.borrow_mut();
261        // Synchronize with all previous writes to this buffer.
262        // FIXME: this over-synchronizes; a more precise approach would be to
263        // only sync with the writes whose data we will read.
264        ecx.acquire_clock(&readbuf.clock);
265
266        // Do full read / partial read based on the space available.
267        // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
268        let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
269
270        // Need to drop before others can access the readbuf again.
271        drop(readbuf);
272
273        // A notification should be provided for the peer file description even when it can
274        // only write 1 byte. This implementation is not compliant with the actual Linux kernel
275        // implementation. For optimization reasons, the kernel will only mark the file description
276        // as "writable" when it can write more than a certain number of bytes. Since we
277        // don't know what that *certain number* is, we will provide a notification every time
278        // a read is successful. This might result in our epoll emulation providing more
279        // notifications than the real system.
280        if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
281            // Unblock all threads that are currently blocked on peer_fd's write.
282            let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
283            // FIXME: We can randomize the order of unblocking.
284            for thread_id in waiting_threads {
285                ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
286            }
287            // Notify epoll waiters.
288            ecx.check_and_update_readiness(peer_fd)?;
289        };
290
291        return finish.call(ecx, Ok(read_size));
292    }
293    interp_ok(())
294}
295
296impl UnixFileDescription for AnonSocket {
297    fn get_epoll_ready_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadyEvents> {
298        // We only check the status of EPOLLIN, EPOLLOUT, EPOLLHUP and EPOLLRDHUP flags.
299        // If other event flags need to be supported in the future, the check should be added here.
300
301        let mut epoll_ready_events = EpollReadyEvents::new();
302
303        // Check if it is readable.
304        if let Some(readbuf) = &self.readbuf {
305            if !readbuf.borrow().buf.is_empty() {
306                epoll_ready_events.epollin = true;
307            }
308        } else {
309            // Without a read buffer, reading never blocks, so we are always ready.
310            epoll_ready_events.epollin = true;
311        }
312
313        // Check if is writable.
314        if let Some(peer_fd) = self.peer_fd().upgrade() {
315            if let Some(writebuf) = &peer_fd.readbuf {
316                let data_size = writebuf.borrow().buf.len();
317                let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
318                if available_space != 0 {
319                    epoll_ready_events.epollout = true;
320                }
321            } else {
322                // Without a write buffer, writing never blocks.
323                epoll_ready_events.epollout = true;
324            }
325        } else {
326            // Peer FD has been closed. This always sets both the RDHUP and HUP flags
327            // as we do not support `shutdown` that could be used to partially close the stream.
328            epoll_ready_events.epollrdhup = true;
329            epoll_ready_events.epollhup = true;
330            // Since the peer is closed, even if no data is available reads will return EOF and
331            // writes will return EPIPE. In other words, they won't block, so we mark this as ready
332            // for read and write.
333            epoll_ready_events.epollin = true;
334            epoll_ready_events.epollout = true;
335            // If there is data lost in peer_fd, set EPOLLERR.
336            if self.peer_lost_data.get() {
337                epoll_ready_events.epollerr = true;
338            }
339        }
340        interp_ok(epoll_ready_events)
341    }
342}
343
344impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
345pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
346    /// For more information on the arguments see the socketpair manpage:
347    /// <https://linux.die.net/man/2/socketpair>
348    fn socketpair(
349        &mut self,
350        domain: &OpTy<'tcx>,
351        type_: &OpTy<'tcx>,
352        protocol: &OpTy<'tcx>,
353        sv: &OpTy<'tcx>,
354    ) -> InterpResult<'tcx, Scalar> {
355        let this = self.eval_context_mut();
356
357        let domain = this.read_scalar(domain)?.to_i32()?;
358        let mut flags = this.read_scalar(type_)?.to_i32()?;
359        let protocol = this.read_scalar(protocol)?.to_i32()?;
360        // This is really a pointer to `[i32; 2]` but we use a ptr-to-first-element representation.
361        let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
362
363        let mut is_sock_nonblock = false;
364
365        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
366        // if there is anything left at the end, that's an unsupported flag.
367        if this.tcx.sess.target.os == "linux" {
368            // SOCK_NONBLOCK only exists on Linux.
369            let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
370            let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
371            if flags & sock_nonblock == sock_nonblock {
372                is_sock_nonblock = true;
373                flags &= !sock_nonblock;
374            }
375            if flags & sock_cloexec == sock_cloexec {
376                flags &= !sock_cloexec;
377            }
378        }
379
380        // Fail on unsupported input.
381        // AF_UNIX and AF_LOCAL are synonyms, so we accept both in case
382        // their values differ.
383        if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
384            throw_unsup_format!(
385                "socketpair: domain {:#x} is unsupported, only AF_UNIX \
386                                 and AF_LOCAL are allowed",
387                domain
388            );
389        } else if flags != this.eval_libc_i32("SOCK_STREAM") {
390            throw_unsup_format!(
391                "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
392                                 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
393                flags
394            );
395        } else if protocol != 0 {
396            throw_unsup_format!(
397                "socketpair: socket protocol {protocol} is unsupported, \
398                                 only 0 is allowed",
399            );
400        }
401
402        // Generate file descriptions.
403        let fds = &mut this.machine.fds;
404        let fd0 = fds.new_ref(AnonSocket {
405            readbuf: Some(RefCell::new(Buffer::new())),
406            peer_fd: OnceCell::new(),
407            peer_lost_data: Cell::new(false),
408            blocked_read_tid: RefCell::new(Vec::new()),
409            blocked_write_tid: RefCell::new(Vec::new()),
410            is_nonblock: is_sock_nonblock,
411        });
412        let fd1 = fds.new_ref(AnonSocket {
413            readbuf: Some(RefCell::new(Buffer::new())),
414            peer_fd: OnceCell::new(),
415            peer_lost_data: Cell::new(false),
416            blocked_read_tid: RefCell::new(Vec::new()),
417            blocked_write_tid: RefCell::new(Vec::new()),
418            is_nonblock: is_sock_nonblock,
419        });
420
421        // Make the file descriptions point to each other.
422        fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
423        fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
424
425        // Insert the file description to the fd table, generating the file descriptors.
426        let sv0 = fds.insert(fd0);
427        let sv1 = fds.insert(fd1);
428
429        // Return socketpair file descriptors to the caller.
430        let sv0 = Scalar::from_int(sv0, sv.layout.size);
431        let sv1 = Scalar::from_int(sv1, sv.layout.size);
432        this.write_scalar(sv0, &sv)?;
433        this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
434
435        interp_ok(Scalar::from_i32(0))
436    }
437
438    fn pipe2(
439        &mut self,
440        pipefd: &OpTy<'tcx>,
441        flags: Option<&OpTy<'tcx>>,
442    ) -> InterpResult<'tcx, Scalar> {
443        let this = self.eval_context_mut();
444
445        let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
446        let mut flags = match flags {
447            Some(flags) => this.read_scalar(flags)?.to_i32()?,
448            None => 0,
449        };
450
451        let cloexec = this.eval_libc_i32("O_CLOEXEC");
452        let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
453
454        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
455        // if there is anything left at the end, that's an unsupported flag.
456        let mut is_nonblock = false;
457        if flags & o_nonblock == o_nonblock {
458            is_nonblock = true;
459            flags &= !o_nonblock;
460        }
461        // As usual we ignore CLOEXEC.
462        if flags & cloexec == cloexec {
463            flags &= !cloexec;
464        }
465        if flags != 0 {
466            throw_unsup_format!("unsupported flags in `pipe2`");
467        }
468
469        // Generate file descriptions.
470        // pipefd[0] refers to the read end of the pipe.
471        let fds = &mut this.machine.fds;
472        let fd0 = fds.new_ref(AnonSocket {
473            readbuf: Some(RefCell::new(Buffer::new())),
474            peer_fd: OnceCell::new(),
475            peer_lost_data: Cell::new(false),
476            blocked_read_tid: RefCell::new(Vec::new()),
477            blocked_write_tid: RefCell::new(Vec::new()),
478            is_nonblock,
479        });
480        let fd1 = fds.new_ref(AnonSocket {
481            readbuf: None,
482            peer_fd: OnceCell::new(),
483            peer_lost_data: Cell::new(false),
484            blocked_read_tid: RefCell::new(Vec::new()),
485            blocked_write_tid: RefCell::new(Vec::new()),
486            is_nonblock,
487        });
488
489        // Make the file descriptions point to each other.
490        fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
491        fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
492
493        // Insert the file description to the fd table, generating the file descriptors.
494        let pipefd0 = fds.insert(fd0);
495        let pipefd1 = fds.insert(fd1);
496
497        // Return file descriptors to the caller.
498        let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
499        let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
500        this.write_scalar(pipefd0, &pipefd)?;
501        this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
502
503        interp_ok(Scalar::from_i32(0))
504    }
505}