Skip to main content

miri/shims/unix/
virtual_socket.rs

1//! This implements "virtual" sockets, that do not correspond to anything on the host system and
2//! are entirely implemented inside Miri.
3//! This is used to implement `socketpair` and `pipe`.
4
5use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io::{self, ErrorKind, Read};
8
9use rustc_target::spec::Os;
10
11use crate::concurrency::VClock;
12use crate::shims::files::{
13    EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
14};
15use crate::shims::unix::UnixFileDescription;
16use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
17use crate::*;
18
19/// The maximum capacity of the socketpair buffer in bytes.
20/// This number is arbitrary as the value can always
21/// be configured in the real system.
22const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
23
24#[derive(Debug, PartialEq)]
25enum VirtualSocketType {
26    // Either end of the socketpair fd.
27    Socketpair,
28    // Read end of the pipe.
29    PipeRead,
30    // Write end of the pipe.
31    PipeWrite,
32}
33
34/// One end of a pair of connected virtual sockets.
35#[derive(Debug)]
36struct VirtualSocket {
37    /// The buffer we are reading from, or `None` if this is the writing end of a pipe.
38    /// (In that case, the peer FD will be the reading end of that pipe.)
39    readbuf: Option<RefCell<Buffer>>,
40    /// The `VirtualSocket` file descriptor that is our "peer", and that holds the buffer we are
41    /// writing to. This is a weak reference because the other side may be closed before us; all
42    /// future writes will then trigger EPIPE.
43    peer_fd: OnceCell<WeakFileDescriptionRef<VirtualSocket>>,
44    /// Indicates whether the peer has lost data when the file description is closed.
45    /// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
46    /// of closure.
47    peer_lost_data: Cell<bool>,
48    /// A list of thread ids blocked because the buffer was empty.
49    /// Once another thread writes some bytes, these threads will be unblocked.
50    blocked_read_tid: RefCell<Vec<ThreadId>>,
51    /// A list of thread ids blocked because the buffer was full.
52    /// Once another thread reads some bytes, these threads will be unblocked.
53    blocked_write_tid: RefCell<Vec<ThreadId>>,
54    /// Whether this fd is non-blocking or not.
55    is_nonblock: Cell<bool>,
56    // Differentiate between different virtual socket fd types.
57    fd_type: VirtualSocketType,
58}
59
60#[derive(Debug)]
61struct Buffer {
62    buf: VecDeque<u8>,
63    clock: VClock,
64}
65
66impl Buffer {
67    fn new() -> Self {
68        Buffer { buf: VecDeque::new(), clock: VClock::default() }
69    }
70}
71
72impl VirtualSocket {
73    fn peer_fd(&self) -> &WeakFileDescriptionRef<VirtualSocket> {
74        self.peer_fd.get().unwrap()
75    }
76}
77
78impl FileDescription for VirtualSocket {
79    fn name(&self) -> &'static str {
80        match self.fd_type {
81            VirtualSocketType::Socketpair => "socketpair",
82            VirtualSocketType::PipeRead | VirtualSocketType::PipeWrite => "pipe",
83        }
84    }
85
86    fn metadata<'tcx>(
87        &self,
88    ) -> InterpResult<'tcx, Either<io::Result<std::fs::Metadata>, &'static str>> {
89        let mode_name = match self.fd_type {
90            VirtualSocketType::Socketpair => "S_IFSOCK",
91            VirtualSocketType::PipeRead | VirtualSocketType::PipeWrite => "S_IFIFO",
92        };
93        interp_ok(Either::Right(mode_name))
94    }
95
96    fn destroy<'tcx>(
97        self,
98        _self_id: FdId,
99        _communicate_allowed: bool,
100        ecx: &mut MiriInterpCx<'tcx>,
101    ) -> InterpResult<'tcx, io::Result<()>> {
102        if let Some(peer_fd) = self.peer_fd().upgrade() {
103            // If the current readbuf is non-empty when the file description is closed,
104            // notify the peer that data lost has happened in current file description.
105            if let Some(readbuf) = &self.readbuf {
106                if !readbuf.borrow().buf.is_empty() {
107                    peer_fd.peer_lost_data.set(true);
108                }
109            }
110            // Notify peer fd that close has happened, since that can unblock reads and writes.
111            ecx.update_epoll_active_events(peer_fd, /* force_edge */ false)?;
112        }
113        interp_ok(Ok(()))
114    }
115
116    fn read<'tcx>(
117        self: FileDescriptionRef<Self>,
118        _communicate_allowed: bool,
119        ptr: Pointer,
120        len: usize,
121        ecx: &mut MiriInterpCx<'tcx>,
122        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
123    ) -> InterpResult<'tcx> {
124        virtual_socket_read(self, ptr, len, ecx, finish)
125    }
126
127    fn write<'tcx>(
128        self: FileDescriptionRef<Self>,
129        _communicate_allowed: bool,
130        ptr: Pointer,
131        len: usize,
132        ecx: &mut MiriInterpCx<'tcx>,
133        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
134    ) -> InterpResult<'tcx> {
135        virtual_socket_write(self, ptr, len, ecx, finish)
136    }
137
138    fn short_fd_operations(&self) -> bool {
139        // Linux de-facto guarantees (or at least, applications like tokio assume [1, 2]) that
140        // when a read/write on a streaming socket comes back short, the kernel buffer is
141        // empty/full. SO we can't do short reads/writes here.
142        //
143        // [1]: https://github.com/tokio-rs/tokio/blob/6c03e03898d71eca976ee1ad8481cf112ae722ba/tokio/src/io/poll_evented.rs#L182
144        // [2]: https://github.com/tokio-rs/tokio/blob/6c03e03898d71eca976ee1ad8481cf112ae722ba/tokio/src/io/poll_evented.rs#L240
145        false
146    }
147
148    fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
149        self
150    }
151
152    fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
153        let mut flags = 0;
154
155        // Get flag for file access mode.
156        // The flag for both socketpair and pipe will remain the same even when the peer
157        // fd is closed, so we need to look at the original type of this socket, not at whether
158        // the peer socket still exists.
159        match self.fd_type {
160            VirtualSocketType::Socketpair => {
161                flags |= ecx.eval_libc_i32("O_RDWR");
162            }
163            VirtualSocketType::PipeRead => {
164                flags |= ecx.eval_libc_i32("O_RDONLY");
165            }
166            VirtualSocketType::PipeWrite => {
167                flags |= ecx.eval_libc_i32("O_WRONLY");
168            }
169        }
170
171        // Get flag for blocking status.
172        if self.is_nonblock.get() {
173            flags |= ecx.eval_libc_i32("O_NONBLOCK");
174        }
175
176        interp_ok(Scalar::from_i32(flags))
177    }
178
179    fn set_flags<'tcx>(
180        &self,
181        mut flag: i32,
182        ecx: &mut MiriInterpCx<'tcx>,
183    ) -> InterpResult<'tcx, Scalar> {
184        let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
185
186        // O_NONBLOCK flag can be set / unset by user.
187        if flag & o_nonblock == o_nonblock {
188            self.is_nonblock.set(true);
189            flag &= !o_nonblock;
190        } else {
191            self.is_nonblock.set(false);
192        }
193
194        // Throw error if there is any unsupported flag.
195        if flag != 0 {
196            throw_unsup_format!(
197                "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
198            )
199        }
200
201        interp_ok(Scalar::from_i32(0))
202    }
203}
204
205/// Write to VirtualSocket based on the space available and return the written byte size.
206fn virtual_socket_write<'tcx>(
207    self_ref: FileDescriptionRef<VirtualSocket>,
208    ptr: Pointer,
209    len: usize,
210    ecx: &mut MiriInterpCx<'tcx>,
211    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
212) -> InterpResult<'tcx> {
213    // Always succeed on write size 0.
214    // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
215    if len == 0 {
216        return finish.call(ecx, Ok(0));
217    }
218
219    // We are writing to our peer's readbuf.
220    let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
221        // If the upgrade from Weak to Rc fails, it indicates that all read ends have been
222        // closed. It is an error to write even if there would be space.
223        return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
224    };
225
226    let Some(writebuf) = &peer_fd.readbuf else {
227        // Writing to the read end of a pipe.
228        return finish.call(ecx, Err(IoError::LibcError("EBADF")));
229    };
230
231    // Let's see if we can write.
232    let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
233    if available_space == 0 {
234        if self_ref.is_nonblock.get() {
235            // Non-blocking socketpair with a full buffer.
236            return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
237        } else {
238            self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
239            // Blocking socketpair with a full buffer.
240            // Block the current thread; only keep a weak ref for this.
241            let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
242            ecx.block_thread(
243                BlockReason::VirtualSocket,
244                None,
245                callback!(
246                    @capture<'tcx> {
247                        weak_self_ref: WeakFileDescriptionRef<VirtualSocket>,
248                        ptr: Pointer,
249                        len: usize,
250                        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
251                    }
252                    |this, unblock: UnblockKind| {
253                        assert_eq!(unblock, UnblockKind::Ready);
254                        // If we got unblocked, then our peer successfully upgraded its weak
255                        // ref to us. That means we can also upgrade our weak ref.
256                        let self_ref = weak_self_ref.upgrade().unwrap();
257                        virtual_socket_write(self_ref, ptr, len, this, finish)
258                    }
259                ),
260            );
261        }
262    } else {
263        // There is space to write!
264        let mut writebuf = writebuf.borrow_mut();
265        // Remember this clock so `read` can synchronize with us.
266        ecx.release_clock(|clock| {
267            writebuf.clock.join(clock);
268        })?;
269        // Do full write / partial write based on the space available.
270        let write_size = len.min(available_space);
271        let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
272        assert_eq!(actual_write_size, write_size);
273
274        // Need to stop accessing peer_fd so that it can be notified.
275        drop(writebuf);
276
277        // Unblock all threads that are currently blocked on peer_fd's read.
278        let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
279        // FIXME: We can randomize the order of unblocking.
280        for thread_id in waiting_threads {
281            ecx.unblock_thread(thread_id, BlockReason::VirtualSocket)?;
282        }
283        // Notify epoll waiters: we might be no longer writable, peer might now be readable.
284        // The notification to the peer seems to be always sent on Linux, even if the
285        // FD was readable before.
286        ecx.update_epoll_active_events(self_ref, /* force_edge */ false)?;
287        ecx.update_epoll_active_events(peer_fd, /* force_edge */ true)?;
288
289        return finish.call(ecx, Ok(write_size));
290    }
291    interp_ok(())
292}
293
294/// Read from VirtualSocket and return the number of bytes read.
295fn virtual_socket_read<'tcx>(
296    self_ref: FileDescriptionRef<VirtualSocket>,
297    ptr: Pointer,
298    len: usize,
299    ecx: &mut MiriInterpCx<'tcx>,
300    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
301) -> InterpResult<'tcx> {
302    // Always succeed on read size 0.
303    if len == 0 {
304        return finish.call(ecx, Ok(0));
305    }
306
307    let Some(readbuf) = &self_ref.readbuf else {
308        // FIXME: This should return EBADF, but there's no nice way to do that as there's no
309        // corresponding ErrorKind variant.
310        throw_unsup_format!("reading from the write end of a pipe")
311    };
312
313    if readbuf.borrow_mut().buf.is_empty() {
314        if self_ref.peer_fd().upgrade().is_none() {
315            // Socketpair with no peer and empty buffer.
316            // 0 bytes successfully read indicates end-of-file.
317            return finish.call(ecx, Ok(0));
318        } else if self_ref.is_nonblock.get() {
319            // Non-blocking socketpair with writer and empty buffer.
320            // https://linux.die.net/man/2/read
321            // EAGAIN or EWOULDBLOCK can be returned for socket,
322            // POSIX.1-2001 allows either error to be returned for this case.
323            // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
324            return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
325        } else {
326            self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
327            // Blocking socketpair with writer and empty buffer.
328            // Block the current thread; only keep a weak ref for this.
329            let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
330            ecx.block_thread(
331                BlockReason::VirtualSocket,
332                None,
333                callback!(
334                    @capture<'tcx> {
335                        weak_self_ref: WeakFileDescriptionRef<VirtualSocket>,
336                        ptr: Pointer,
337                        len: usize,
338                        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
339                    }
340                    |this, unblock: UnblockKind| {
341                        assert_eq!(unblock, UnblockKind::Ready);
342                        // If we got unblocked, then our peer successfully upgraded its weak
343                        // ref to us. That means we can also upgrade our weak ref.
344                        let self_ref = weak_self_ref.upgrade().unwrap();
345                        virtual_socket_read(self_ref, ptr, len, this, finish)
346                    }
347                ),
348            );
349        }
350    } else {
351        // There's data to be read!
352        let mut readbuf = readbuf.borrow_mut();
353        // Synchronize with all previous writes to this buffer.
354        // FIXME: this over-synchronizes; a more precise approach would be to
355        // only sync with the writes whose data we will read.
356        ecx.acquire_clock(&readbuf.clock)?;
357
358        // Do full read / partial read based on the space available.
359        // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
360        let read_size = ecx.read_from_host(|buf| readbuf.buf.read(buf), len, ptr)?.unwrap();
361        let readbuf_now_empty = readbuf.buf.is_empty();
362
363        // Need to drop before others can access the readbuf again.
364        drop(readbuf);
365
366        // A notification should be provided for the peer file description even when it can
367        // only write 1 byte. This implementation is not compliant with the actual Linux kernel
368        // implementation. For optimization reasons, the kernel will only mark the file description
369        // as "writable" when it can write more than a certain number of bytes. Since we
370        // don't know what that *certain number* is, we will provide a notification every time
371        // a read is successful. This might result in our epoll emulation providing more
372        // notifications than the real system.
373        if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
374            // Unblock all threads that are currently blocked on peer_fd's write.
375            let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
376            // FIXME: We can randomize the order of unblocking.
377            for thread_id in waiting_threads {
378                ecx.unblock_thread(thread_id, BlockReason::VirtualSocket)?;
379            }
380            // Notify epoll waiters: peer is now writable.
381            // Linux seems to always notify the peer if the read buffer is now empty.
382            // (Linux also does that if this was a "big" read, but to avoid some arbitrary
383            // threshold, we do not match that.)
384            ecx.update_epoll_active_events(peer_fd, /* force_edge */ readbuf_now_empty)?;
385        };
386        // Notify epoll waiters: we might be no longer readable.
387        ecx.update_epoll_active_events(self_ref, /* force_edge */ false)?;
388
389        return finish.call(ecx, Ok(read_size));
390    }
391    interp_ok(())
392}
393
394impl UnixFileDescription for VirtualSocket {
395    fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
396        // We only check the status of EPOLLIN, EPOLLOUT, EPOLLHUP and EPOLLRDHUP flags.
397        // If other event flags need to be supported in the future, the check should be added here.
398
399        let mut epoll_ready_events = EpollEvents::new();
400
401        // Check if it is readable.
402        if let Some(readbuf) = &self.readbuf {
403            if !readbuf.borrow().buf.is_empty() {
404                epoll_ready_events.epollin = true;
405            }
406        } else {
407            // Without a read buffer, reading never blocks, so we are always ready.
408            epoll_ready_events.epollin = true;
409        }
410
411        // Check if is writable.
412        if let Some(peer_fd) = self.peer_fd().upgrade() {
413            if let Some(writebuf) = &peer_fd.readbuf {
414                let data_size = writebuf.borrow().buf.len();
415                let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
416                if available_space != 0 {
417                    epoll_ready_events.epollout = true;
418                }
419            } else {
420                // Without a write buffer, writing never blocks.
421                epoll_ready_events.epollout = true;
422            }
423        } else {
424            // Peer FD has been closed. This always sets both the RDHUP and HUP flags
425            // as we do not support `shutdown` that could be used to partially close the stream.
426            epoll_ready_events.epollrdhup = true;
427            epoll_ready_events.epollhup = true;
428            // Since the peer is closed, even if no data is available reads will return EOF and
429            // writes will return EPIPE. In other words, they won't block, so we mark this as ready
430            // for read and write.
431            epoll_ready_events.epollin = true;
432            epoll_ready_events.epollout = true;
433            // If there is data lost in peer_fd, set EPOLLERR.
434            if self.peer_lost_data.get() {
435                epoll_ready_events.epollerr = true;
436            }
437        }
438        interp_ok(epoll_ready_events)
439    }
440}
441
442impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
443pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
444    /// For more information on the arguments see the socketpair manpage:
445    /// <https://linux.die.net/man/2/socketpair>
446    fn socketpair(
447        &mut self,
448        domain: &OpTy<'tcx>,
449        type_: &OpTy<'tcx>,
450        protocol: &OpTy<'tcx>,
451        sv: &OpTy<'tcx>,
452    ) -> InterpResult<'tcx, Scalar> {
453        let this = self.eval_context_mut();
454
455        let domain = this.read_scalar(domain)?.to_i32()?;
456        let mut flags = this.read_scalar(type_)?.to_i32()?;
457        let protocol = this.read_scalar(protocol)?.to_i32()?;
458        // This is really a pointer to `[i32; 2]` but we use a ptr-to-first-element representation.
459        let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
460
461        let mut is_sock_nonblock = false;
462
463        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
464        // if there is anything left at the end, that's an unsupported flag.
465        if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android) {
466            // SOCK_NONBLOCK only exists on Linux.
467            let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
468            let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
469            if flags & sock_nonblock == sock_nonblock {
470                is_sock_nonblock = true;
471                flags &= !sock_nonblock;
472            }
473            if flags & sock_cloexec == sock_cloexec {
474                flags &= !sock_cloexec;
475            }
476        }
477
478        // Fail on unsupported input.
479        // AF_UNIX and AF_LOCAL are synonyms, so we accept both in case
480        // their values differ.
481        if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
482            throw_unsup_format!(
483                "socketpair: domain {:#x} is unsupported, only AF_UNIX \
484                                 and AF_LOCAL are allowed",
485                domain
486            );
487        } else if flags != this.eval_libc_i32("SOCK_STREAM") {
488            throw_unsup_format!(
489                "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
490                                 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
491                flags
492            );
493        } else if protocol != 0 {
494            throw_unsup_format!(
495                "socketpair: socket protocol {protocol} is unsupported, \
496                                 only 0 is allowed",
497            );
498        }
499
500        // Generate file descriptions.
501        let fds = &mut this.machine.fds;
502        let fd0 = fds.new_ref(VirtualSocket {
503            readbuf: Some(RefCell::new(Buffer::new())),
504            peer_fd: OnceCell::new(),
505            peer_lost_data: Cell::new(false),
506            blocked_read_tid: RefCell::new(Vec::new()),
507            blocked_write_tid: RefCell::new(Vec::new()),
508            is_nonblock: Cell::new(is_sock_nonblock),
509            fd_type: VirtualSocketType::Socketpair,
510        });
511        let fd1 = fds.new_ref(VirtualSocket {
512            readbuf: Some(RefCell::new(Buffer::new())),
513            peer_fd: OnceCell::new(),
514            peer_lost_data: Cell::new(false),
515            blocked_read_tid: RefCell::new(Vec::new()),
516            blocked_write_tid: RefCell::new(Vec::new()),
517            is_nonblock: Cell::new(is_sock_nonblock),
518            fd_type: VirtualSocketType::Socketpair,
519        });
520
521        // Make the file descriptions point to each other.
522        fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
523        fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
524
525        // Insert the file description to the fd table, generating the file descriptors.
526        let sv0 = fds.insert(fd0);
527        let sv1 = fds.insert(fd1);
528
529        // Return socketpair file descriptors to the caller.
530        let sv0 = Scalar::from_int(sv0, sv.layout.size);
531        let sv1 = Scalar::from_int(sv1, sv.layout.size);
532        this.write_scalar(sv0, &sv)?;
533        this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
534
535        interp_ok(Scalar::from_i32(0))
536    }
537
538    fn pipe2(
539        &mut self,
540        pipefd: &OpTy<'tcx>,
541        flags: Option<&OpTy<'tcx>>,
542    ) -> InterpResult<'tcx, Scalar> {
543        let this = self.eval_context_mut();
544
545        let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
546        let mut flags = match flags {
547            Some(flags) => this.read_scalar(flags)?.to_i32()?,
548            None => 0,
549        };
550
551        let cloexec = this.eval_libc_i32("O_CLOEXEC");
552        let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
553
554        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
555        // if there is anything left at the end, that's an unsupported flag.
556        let mut is_nonblock = false;
557        if flags & o_nonblock == o_nonblock {
558            is_nonblock = true;
559            flags &= !o_nonblock;
560        }
561        // As usual we ignore CLOEXEC.
562        if flags & cloexec == cloexec {
563            flags &= !cloexec;
564        }
565        if flags != 0 {
566            throw_unsup_format!("unsupported flags in `pipe2`");
567        }
568
569        // Generate file descriptions.
570        // pipefd[0] refers to the read end of the pipe.
571        let fds = &mut this.machine.fds;
572        let fd0 = fds.new_ref(VirtualSocket {
573            readbuf: Some(RefCell::new(Buffer::new())),
574            peer_fd: OnceCell::new(),
575            peer_lost_data: Cell::new(false),
576            blocked_read_tid: RefCell::new(Vec::new()),
577            blocked_write_tid: RefCell::new(Vec::new()),
578            is_nonblock: Cell::new(is_nonblock),
579            fd_type: VirtualSocketType::PipeRead,
580        });
581        let fd1 = fds.new_ref(VirtualSocket {
582            readbuf: None,
583            peer_fd: OnceCell::new(),
584            peer_lost_data: Cell::new(false),
585            blocked_read_tid: RefCell::new(Vec::new()),
586            blocked_write_tid: RefCell::new(Vec::new()),
587            is_nonblock: Cell::new(is_nonblock),
588            fd_type: VirtualSocketType::PipeWrite,
589        });
590
591        // Make the file descriptions point to each other.
592        fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
593        fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
594
595        // Insert the file description to the fd table, generating the file descriptors.
596        let pipefd0 = fds.insert(fd0);
597        let pipefd1 = fds.insert(fd1);
598
599        // Return file descriptors to the caller.
600        let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
601        let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
602        this.write_scalar(pipefd0, &pipefd)?;
603        this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
604
605        interp_ok(Scalar::from_i32(0))
606    }
607}