miri/shims/unix/
unnamed_socket.rs

1//! This implements "anonymous" sockets, that do not correspond to anything on the host system and
2//! are entirely implemented inside Miri.
3//! We also use the same infrastructure to implement unnamed pipes.
4
5use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use rustc_target::spec::Os;
11
12use crate::concurrency::VClock;
13use crate::shims::files::{
14    EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
15};
16use crate::shims::unix::UnixFileDescription;
17use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
18use crate::*;
19
20/// The maximum capacity of the socketpair buffer in bytes.
21/// This number is arbitrary as the value can always
22/// be configured in the real system.
23const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
24
25#[derive(Debug, PartialEq)]
26enum AnonSocketType {
27    // Either end of the socketpair fd.
28    Socketpair,
29    // Read end of the pipe.
30    PipeRead,
31    // Write end of the pipe.
32    PipeWrite,
33}
34
35/// One end of a pair of connected unnamed sockets.
36#[derive(Debug)]
37struct AnonSocket {
38    /// The buffer we are reading from, or `None` if this is the writing end of a pipe.
39    /// (In that case, the peer FD will be the reading end of that pipe.)
40    readbuf: Option<RefCell<Buffer>>,
41    /// The `AnonSocket` file descriptor that is our "peer", and that holds the buffer we are
42    /// writing to. This is a weak reference because the other side may be closed before us; all
43    /// future writes will then trigger EPIPE.
44    peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
45    /// Indicates whether the peer has lost data when the file description is closed.
46    /// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
47    /// of closure.
48    peer_lost_data: Cell<bool>,
49    /// A list of thread ids blocked because the buffer was empty.
50    /// Once another thread writes some bytes, these threads will be unblocked.
51    blocked_read_tid: RefCell<Vec<ThreadId>>,
52    /// A list of thread ids blocked because the buffer was full.
53    /// Once another thread reads some bytes, these threads will be unblocked.
54    blocked_write_tid: RefCell<Vec<ThreadId>>,
55    /// Whether this fd is non-blocking or not.
56    is_nonblock: Cell<bool>,
57    // Differentiate between different AnonSocket fd types.
58    fd_type: AnonSocketType,
59}
60
61#[derive(Debug)]
62struct Buffer {
63    buf: VecDeque<u8>,
64    clock: VClock,
65}
66
67impl Buffer {
68    fn new() -> Self {
69        Buffer { buf: VecDeque::new(), clock: VClock::default() }
70    }
71}
72
73impl AnonSocket {
74    fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
75        self.peer_fd.get().unwrap()
76    }
77}
78
79impl FileDescription for AnonSocket {
80    fn name(&self) -> &'static str {
81        match self.fd_type {
82            AnonSocketType::Socketpair => "socketpair",
83            AnonSocketType::PipeRead | AnonSocketType::PipeWrite => "pipe",
84        }
85    }
86
87    fn destroy<'tcx>(
88        self,
89        _self_id: FdId,
90        _communicate_allowed: bool,
91        ecx: &mut MiriInterpCx<'tcx>,
92    ) -> InterpResult<'tcx, io::Result<()>> {
93        if let Some(peer_fd) = self.peer_fd().upgrade() {
94            // If the current readbuf is non-empty when the file description is closed,
95            // notify the peer that data lost has happened in current file description.
96            if let Some(readbuf) = &self.readbuf {
97                if !readbuf.borrow().buf.is_empty() {
98                    peer_fd.peer_lost_data.set(true);
99                }
100            }
101            // Notify peer fd that close has happened, since that can unblock reads and writes.
102            ecx.update_epoll_active_events(peer_fd, /* force_edge */ false)?;
103        }
104        interp_ok(Ok(()))
105    }
106
107    fn read<'tcx>(
108        self: FileDescriptionRef<Self>,
109        _communicate_allowed: bool,
110        ptr: Pointer,
111        len: usize,
112        ecx: &mut MiriInterpCx<'tcx>,
113        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
114    ) -> InterpResult<'tcx> {
115        anonsocket_read(self, ptr, len, ecx, finish)
116    }
117
118    fn write<'tcx>(
119        self: FileDescriptionRef<Self>,
120        _communicate_allowed: bool,
121        ptr: Pointer,
122        len: usize,
123        ecx: &mut MiriInterpCx<'tcx>,
124        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
125    ) -> InterpResult<'tcx> {
126        anonsocket_write(self, ptr, len, ecx, finish)
127    }
128
129    fn short_fd_operations(&self) -> bool {
130        // Pipes guarantee that sufficiently small accesses are not broken apart:
131        // <https://pubs.opengroup.org/onlinepubs/9799919799/functions/write.html#tag_17_699_08>.
132        // For now, we don't bother checking for the size, and just entirely disable
133        // short accesses on pipes.
134        matches!(self.fd_type, AnonSocketType::Socketpair)
135    }
136
137    fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
138        self
139    }
140
141    fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
142        let mut flags = 0;
143
144        // Get flag for file access mode.
145        // The flag for both socketpair and pipe will remain the same even when the peer
146        // fd is closed, so we need to look at the original type of this socket, not at whether
147        // the peer socket still exists.
148        match self.fd_type {
149            AnonSocketType::Socketpair => {
150                flags |= ecx.eval_libc_i32("O_RDWR");
151            }
152            AnonSocketType::PipeRead => {
153                flags |= ecx.eval_libc_i32("O_RDONLY");
154            }
155            AnonSocketType::PipeWrite => {
156                flags |= ecx.eval_libc_i32("O_WRONLY");
157            }
158        }
159
160        // Get flag for blocking status.
161        if self.is_nonblock.get() {
162            flags |= ecx.eval_libc_i32("O_NONBLOCK");
163        }
164
165        interp_ok(Scalar::from_i32(flags))
166    }
167
168    fn set_flags<'tcx>(
169        &self,
170        mut flag: i32,
171        ecx: &mut MiriInterpCx<'tcx>,
172    ) -> InterpResult<'tcx, Scalar> {
173        // FIXME: File creation flags should be ignored.
174
175        let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
176        let o_rdonly = ecx.eval_libc_i32("O_RDONLY");
177        let o_wronly = ecx.eval_libc_i32("O_WRONLY");
178        let o_rdwr = ecx.eval_libc_i32("O_RDWR");
179
180        // O_NONBLOCK flag can be set / unset by user.
181        if flag & o_nonblock == o_nonblock {
182            self.is_nonblock.set(true);
183            flag &= !o_nonblock;
184        } else {
185            self.is_nonblock.set(false);
186        }
187
188        // Ignore all file access mode flags.
189        flag &= !(o_rdonly | o_wronly | o_rdwr);
190
191        // Throw error if there is any unsupported flag.
192        if flag != 0 {
193            throw_unsup_format!(
194                "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
195            )
196        }
197
198        interp_ok(Scalar::from_i32(0))
199    }
200}
201
202/// Write to AnonSocket based on the space available and return the written byte size.
203fn anonsocket_write<'tcx>(
204    self_ref: FileDescriptionRef<AnonSocket>,
205    ptr: Pointer,
206    len: usize,
207    ecx: &mut MiriInterpCx<'tcx>,
208    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
209) -> InterpResult<'tcx> {
210    // Always succeed on write size 0.
211    // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
212    if len == 0 {
213        return finish.call(ecx, Ok(0));
214    }
215
216    // We are writing to our peer's readbuf.
217    let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
218        // If the upgrade from Weak to Rc fails, it indicates that all read ends have been
219        // closed. It is an error to write even if there would be space.
220        return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
221    };
222
223    let Some(writebuf) = &peer_fd.readbuf else {
224        // Writing to the read end of a pipe.
225        return finish.call(ecx, Err(IoError::LibcError("EBADF")));
226    };
227
228    // Let's see if we can write.
229    let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
230    if available_space == 0 {
231        if self_ref.is_nonblock.get() {
232            // Non-blocking socketpair with a full buffer.
233            return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
234        } else {
235            self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
236            // Blocking socketpair with a full buffer.
237            // Block the current thread; only keep a weak ref for this.
238            let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
239            ecx.block_thread(
240                BlockReason::UnnamedSocket,
241                None,
242                callback!(
243                    @capture<'tcx> {
244                        weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
245                        ptr: Pointer,
246                        len: usize,
247                        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
248                    }
249                    |this, unblock: UnblockKind| {
250                        assert_eq!(unblock, UnblockKind::Ready);
251                        // If we got unblocked, then our peer successfully upgraded its weak
252                        // ref to us. That means we can also upgrade our weak ref.
253                        let self_ref = weak_self_ref.upgrade().unwrap();
254                        anonsocket_write(self_ref, ptr, len, this, finish)
255                    }
256                ),
257            );
258        }
259    } else {
260        // There is space to write!
261        let mut writebuf = writebuf.borrow_mut();
262        // Remember this clock so `read` can synchronize with us.
263        ecx.release_clock(|clock| {
264            writebuf.clock.join(clock);
265        })?;
266        // Do full write / partial write based on the space available.
267        let write_size = len.min(available_space);
268        let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
269        assert_eq!(actual_write_size, write_size);
270
271        // Need to stop accessing peer_fd so that it can be notified.
272        drop(writebuf);
273
274        // Unblock all threads that are currently blocked on peer_fd's read.
275        let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
276        // FIXME: We can randomize the order of unblocking.
277        for thread_id in waiting_threads {
278            ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
279        }
280        // Notify epoll waiters: we might be no longer writable, peer might now be readable.
281        // The notification to the peer seems to be always sent on Linux, even if the
282        // FD was readable before.
283        ecx.update_epoll_active_events(self_ref, /* force_edge */ false)?;
284        ecx.update_epoll_active_events(peer_fd, /* force_edge */ true)?;
285
286        return finish.call(ecx, Ok(write_size));
287    }
288    interp_ok(())
289}
290
291/// Read from AnonSocket and return the number of bytes read.
292fn anonsocket_read<'tcx>(
293    self_ref: FileDescriptionRef<AnonSocket>,
294    ptr: Pointer,
295    len: usize,
296    ecx: &mut MiriInterpCx<'tcx>,
297    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
298) -> InterpResult<'tcx> {
299    // Always succeed on read size 0.
300    if len == 0 {
301        return finish.call(ecx, Ok(0));
302    }
303
304    let Some(readbuf) = &self_ref.readbuf else {
305        // FIXME: This should return EBADF, but there's no nice way to do that as there's no
306        // corresponding ErrorKind variant.
307        throw_unsup_format!("reading from the write end of a pipe")
308    };
309
310    if readbuf.borrow_mut().buf.is_empty() {
311        if self_ref.peer_fd().upgrade().is_none() {
312            // Socketpair with no peer and empty buffer.
313            // 0 bytes successfully read indicates end-of-file.
314            return finish.call(ecx, Ok(0));
315        } else if self_ref.is_nonblock.get() {
316            // Non-blocking socketpair with writer and empty buffer.
317            // https://linux.die.net/man/2/read
318            // EAGAIN or EWOULDBLOCK can be returned for socket,
319            // POSIX.1-2001 allows either error to be returned for this case.
320            // Since there is no ErrorKind for EAGAIN, WouldBlock is used.
321            return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
322        } else {
323            self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
324            // Blocking socketpair with writer and empty buffer.
325            // Block the current thread; only keep a weak ref for this.
326            let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
327            ecx.block_thread(
328                BlockReason::UnnamedSocket,
329                None,
330                callback!(
331                    @capture<'tcx> {
332                        weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
333                        ptr: Pointer,
334                        len: usize,
335                        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
336                    }
337                    |this, unblock: UnblockKind| {
338                        assert_eq!(unblock, UnblockKind::Ready);
339                        // If we got unblocked, then our peer successfully upgraded its weak
340                        // ref to us. That means we can also upgrade our weak ref.
341                        let self_ref = weak_self_ref.upgrade().unwrap();
342                        anonsocket_read(self_ref, ptr, len, this, finish)
343                    }
344                ),
345            );
346        }
347    } else {
348        // There's data to be read!
349        let mut readbuf = readbuf.borrow_mut();
350        // Synchronize with all previous writes to this buffer.
351        // FIXME: this over-synchronizes; a more precise approach would be to
352        // only sync with the writes whose data we will read.
353        ecx.acquire_clock(&readbuf.clock)?;
354
355        // Do full read / partial read based on the space available.
356        // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
357        let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
358        let readbuf_now_empty = readbuf.buf.is_empty();
359
360        // Need to drop before others can access the readbuf again.
361        drop(readbuf);
362
363        // A notification should be provided for the peer file description even when it can
364        // only write 1 byte. This implementation is not compliant with the actual Linux kernel
365        // implementation. For optimization reasons, the kernel will only mark the file description
366        // as "writable" when it can write more than a certain number of bytes. Since we
367        // don't know what that *certain number* is, we will provide a notification every time
368        // a read is successful. This might result in our epoll emulation providing more
369        // notifications than the real system.
370        if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
371            // Unblock all threads that are currently blocked on peer_fd's write.
372            let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
373            // FIXME: We can randomize the order of unblocking.
374            for thread_id in waiting_threads {
375                ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
376            }
377            // Notify epoll waiters: peer is now writable.
378            // Linux seems to always notify the peer if the read buffer is now empty.
379            // (Linux also does that if this was a "big" read, but to avoid some arbitrary
380            // threshold, we do not match that.)
381            ecx.update_epoll_active_events(peer_fd, /* force_edge */ readbuf_now_empty)?;
382        };
383        // Notify epoll waiters: we might be no longer readable.
384        ecx.update_epoll_active_events(self_ref, /* force_edge */ false)?;
385
386        return finish.call(ecx, Ok(read_size));
387    }
388    interp_ok(())
389}
390
391impl UnixFileDescription for AnonSocket {
392    fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
393        // We only check the status of EPOLLIN, EPOLLOUT, EPOLLHUP and EPOLLRDHUP flags.
394        // If other event flags need to be supported in the future, the check should be added here.
395
396        let mut epoll_ready_events = EpollEvents::new();
397
398        // Check if it is readable.
399        if let Some(readbuf) = &self.readbuf {
400            if !readbuf.borrow().buf.is_empty() {
401                epoll_ready_events.epollin = true;
402            }
403        } else {
404            // Without a read buffer, reading never blocks, so we are always ready.
405            epoll_ready_events.epollin = true;
406        }
407
408        // Check if is writable.
409        if let Some(peer_fd) = self.peer_fd().upgrade() {
410            if let Some(writebuf) = &peer_fd.readbuf {
411                let data_size = writebuf.borrow().buf.len();
412                let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
413                if available_space != 0 {
414                    epoll_ready_events.epollout = true;
415                }
416            } else {
417                // Without a write buffer, writing never blocks.
418                epoll_ready_events.epollout = true;
419            }
420        } else {
421            // Peer FD has been closed. This always sets both the RDHUP and HUP flags
422            // as we do not support `shutdown` that could be used to partially close the stream.
423            epoll_ready_events.epollrdhup = true;
424            epoll_ready_events.epollhup = true;
425            // Since the peer is closed, even if no data is available reads will return EOF and
426            // writes will return EPIPE. In other words, they won't block, so we mark this as ready
427            // for read and write.
428            epoll_ready_events.epollin = true;
429            epoll_ready_events.epollout = true;
430            // If there is data lost in peer_fd, set EPOLLERR.
431            if self.peer_lost_data.get() {
432                epoll_ready_events.epollerr = true;
433            }
434        }
435        interp_ok(epoll_ready_events)
436    }
437}
438
439impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
440pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
441    /// For more information on the arguments see the socketpair manpage:
442    /// <https://linux.die.net/man/2/socketpair>
443    fn socketpair(
444        &mut self,
445        domain: &OpTy<'tcx>,
446        type_: &OpTy<'tcx>,
447        protocol: &OpTy<'tcx>,
448        sv: &OpTy<'tcx>,
449    ) -> InterpResult<'tcx, Scalar> {
450        let this = self.eval_context_mut();
451
452        let domain = this.read_scalar(domain)?.to_i32()?;
453        let mut flags = this.read_scalar(type_)?.to_i32()?;
454        let protocol = this.read_scalar(protocol)?.to_i32()?;
455        // This is really a pointer to `[i32; 2]` but we use a ptr-to-first-element representation.
456        let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
457
458        let mut is_sock_nonblock = false;
459
460        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
461        // if there is anything left at the end, that's an unsupported flag.
462        if this.tcx.sess.target.os == Os::Linux {
463            // SOCK_NONBLOCK only exists on Linux.
464            let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
465            let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
466            if flags & sock_nonblock == sock_nonblock {
467                is_sock_nonblock = true;
468                flags &= !sock_nonblock;
469            }
470            if flags & sock_cloexec == sock_cloexec {
471                flags &= !sock_cloexec;
472            }
473        }
474
475        // Fail on unsupported input.
476        // AF_UNIX and AF_LOCAL are synonyms, so we accept both in case
477        // their values differ.
478        if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
479            throw_unsup_format!(
480                "socketpair: domain {:#x} is unsupported, only AF_UNIX \
481                                 and AF_LOCAL are allowed",
482                domain
483            );
484        } else if flags != this.eval_libc_i32("SOCK_STREAM") {
485            throw_unsup_format!(
486                "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
487                                 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
488                flags
489            );
490        } else if protocol != 0 {
491            throw_unsup_format!(
492                "socketpair: socket protocol {protocol} is unsupported, \
493                                 only 0 is allowed",
494            );
495        }
496
497        // Generate file descriptions.
498        let fds = &mut this.machine.fds;
499        let fd0 = fds.new_ref(AnonSocket {
500            readbuf: Some(RefCell::new(Buffer::new())),
501            peer_fd: OnceCell::new(),
502            peer_lost_data: Cell::new(false),
503            blocked_read_tid: RefCell::new(Vec::new()),
504            blocked_write_tid: RefCell::new(Vec::new()),
505            is_nonblock: Cell::new(is_sock_nonblock),
506            fd_type: AnonSocketType::Socketpair,
507        });
508        let fd1 = fds.new_ref(AnonSocket {
509            readbuf: Some(RefCell::new(Buffer::new())),
510            peer_fd: OnceCell::new(),
511            peer_lost_data: Cell::new(false),
512            blocked_read_tid: RefCell::new(Vec::new()),
513            blocked_write_tid: RefCell::new(Vec::new()),
514            is_nonblock: Cell::new(is_sock_nonblock),
515            fd_type: AnonSocketType::Socketpair,
516        });
517
518        // Make the file descriptions point to each other.
519        fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
520        fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
521
522        // Insert the file description to the fd table, generating the file descriptors.
523        let sv0 = fds.insert(fd0);
524        let sv1 = fds.insert(fd1);
525
526        // Return socketpair file descriptors to the caller.
527        let sv0 = Scalar::from_int(sv0, sv.layout.size);
528        let sv1 = Scalar::from_int(sv1, sv.layout.size);
529        this.write_scalar(sv0, &sv)?;
530        this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
531
532        interp_ok(Scalar::from_i32(0))
533    }
534
535    fn pipe2(
536        &mut self,
537        pipefd: &OpTy<'tcx>,
538        flags: Option<&OpTy<'tcx>>,
539    ) -> InterpResult<'tcx, Scalar> {
540        let this = self.eval_context_mut();
541
542        let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
543        let mut flags = match flags {
544            Some(flags) => this.read_scalar(flags)?.to_i32()?,
545            None => 0,
546        };
547
548        let cloexec = this.eval_libc_i32("O_CLOEXEC");
549        let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
550
551        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
552        // if there is anything left at the end, that's an unsupported flag.
553        let mut is_nonblock = false;
554        if flags & o_nonblock == o_nonblock {
555            is_nonblock = true;
556            flags &= !o_nonblock;
557        }
558        // As usual we ignore CLOEXEC.
559        if flags & cloexec == cloexec {
560            flags &= !cloexec;
561        }
562        if flags != 0 {
563            throw_unsup_format!("unsupported flags in `pipe2`");
564        }
565
566        // Generate file descriptions.
567        // pipefd[0] refers to the read end of the pipe.
568        let fds = &mut this.machine.fds;
569        let fd0 = fds.new_ref(AnonSocket {
570            readbuf: Some(RefCell::new(Buffer::new())),
571            peer_fd: OnceCell::new(),
572            peer_lost_data: Cell::new(false),
573            blocked_read_tid: RefCell::new(Vec::new()),
574            blocked_write_tid: RefCell::new(Vec::new()),
575            is_nonblock: Cell::new(is_nonblock),
576            fd_type: AnonSocketType::PipeRead,
577        });
578        let fd1 = fds.new_ref(AnonSocket {
579            readbuf: None,
580            peer_fd: OnceCell::new(),
581            peer_lost_data: Cell::new(false),
582            blocked_read_tid: RefCell::new(Vec::new()),
583            blocked_write_tid: RefCell::new(Vec::new()),
584            is_nonblock: Cell::new(is_nonblock),
585            fd_type: AnonSocketType::PipeWrite,
586        });
587
588        // Make the file descriptions point to each other.
589        fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
590        fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
591
592        // Insert the file description to the fd table, generating the file descriptors.
593        let pipefd0 = fds.insert(fd0);
594        let pipefd1 = fds.insert(fd1);
595
596        // Return file descriptors to the caller.
597        let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
598        let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
599        this.write_scalar(pipefd0, &pipefd)?;
600        this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
601
602        interp_ok(Scalar::from_i32(0))
603    }
604}