Skip to main content

miri/shims/unix/
socket.rs

1use std::cell::{Cell, RefCell, RefMut};
2use std::io;
3use std::io::Read;
4use std::net::{Ipv4Addr, Shutdown, SocketAddr, SocketAddrV4};
5use std::sync::atomic::AtomicBool;
6use std::time::Duration;
7
8use mio::event::Source;
9use mio::net::{TcpListener, TcpStream};
10use rustc_abi::Size;
11use rustc_const_eval::interpret::{InterpResult, interp_ok};
12use rustc_middle::throw_unsup_format;
13use rustc_target::spec::Os;
14
15use crate::shims::files::{EvalContextExt as _, FdId, FileDescription, FileDescriptionRef};
16use crate::shims::unix::UnixFileDescription;
17use crate::shims::unix::linux_like::epoll::{EpollReadiness, EvalContextExt as _};
18use crate::shims::unix::socket_address::EvalContextExt as _;
19use crate::*;
20
21#[derive(Debug, PartialEq)]
22enum SocketFamily {
23    // IPv4 internet protocols
24    IPv4,
25    // IPv6 internet protocols
26    IPv6,
27}
28
29#[derive(Debug)]
30enum SocketState {
31    /// No syscall after `socket` has been made.
32    Initial,
33    /// The `bind` syscall has been called on the socket.
34    /// This is only reachable from the [`SocketState::Initial`] state.
35    Bound(SocketAddr),
36    /// The `listen` syscall has been called on the socket.
37    /// This is only reachable from the [`SocketState::Bound`] state.
38    Listening(TcpListener),
39    /// The `connect` syscall has been called and we weren't yet able
40    /// to ensure the connection is established. This is only reachable
41    /// from the [`SocketState::Initial`] state.
42    Connecting(TcpStream),
43    /// The `connect` syscall has been called on the socket and
44    /// we ensured that the connection is established, or
45    /// the socket was created by the `accept` syscall.
46    /// For a socket created using the `connect` syscall, this is
47    /// only reachable from the [`SocketState::Connecting`] state.
48    Connected(TcpStream),
49}
50
51#[derive(Debug)]
52struct Socket {
53    /// Family of the socket, used to ensure socket only binds/connects to address of
54    /// same family.
55    family: SocketFamily,
56    /// Current state of the inner socket.
57    state: RefCell<SocketState>,
58    /// Whether this fd is non-blocking or not.
59    is_non_block: Cell<bool>,
60    /// The current blocking I/O readiness of the file description.
61    io_readiness: RefCell<BlockingIoSourceReadiness>,
62    /// [`Some`] when the socket had an async error which has not yet been fetched via `SO_ERROR`.
63    error: RefCell<Option<io::Error>>,
64}
65
66impl FileDescription for Socket {
67    fn name(&self) -> &'static str {
68        "socket"
69    }
70
71    fn destroy<'tcx>(
72        self,
73        self_id: FdId,
74        communicate_allowed: bool,
75        ecx: &mut MiriInterpCx<'tcx>,
76    ) -> InterpResult<'tcx, io::Result<()>> {
77        assert!(communicate_allowed, "cannot have `Socket` with isolation enabled!");
78
79        if matches!(
80            &*self.state.borrow(),
81            SocketState::Listening(_) | SocketState::Connecting(_) | SocketState::Connected(_)
82        ) {
83            // There exists an associated host socket so we need to deregister it
84            // from the blocking I/O manager.
85            ecx.machine.blocking_io.deregister(self_id, self)
86        };
87
88        interp_ok(Ok(()))
89    }
90
91    fn read<'tcx>(
92        self: FileDescriptionRef<Self>,
93        communicate_allowed: bool,
94        ptr: Pointer,
95        len: usize,
96        ecx: &mut MiriInterpCx<'tcx>,
97        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
98    ) -> InterpResult<'tcx> {
99        assert!(communicate_allowed, "cannot have `Socket` with isolation enabled!");
100
101        let socket = self;
102
103        ecx.ensure_connected(
104            socket.clone(),
105            !socket.is_non_block.get(),
106            "read",
107            callback!(
108                @capture<'tcx> {
109                    socket: FileDescriptionRef<Socket>,
110                    ptr: Pointer,
111                    len: usize,
112                    finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
113                } |this, result: Result<(), ()>| {
114                    if result.is_err() {
115                        return finish.call(this, Err(LibcError("ENOTCONN")))
116                    }
117
118                    // Since `read` is the same as `recv` with no flags, we just treat
119                    // the `read` as a `recv` here.
120
121                    if socket.is_non_block.get() {
122                        // We have a non-blocking socket and thus don't want to block until
123                        // we can read.
124                        let result = this.try_non_block_recv(&socket, ptr, len, /* should_peek */ false)?;
125                        finish.call(this, result)
126                    } else {
127                        // The socket is in blocking mode and thus the read call should block
128                        // until we can read some bytes from the socket.
129                        this.block_for_recv(socket, ptr, len, /* should_peek */ false, finish)
130                    }
131                }
132            ),
133        )
134    }
135
136    fn write<'tcx>(
137        self: FileDescriptionRef<Self>,
138        communicate_allowed: bool,
139        ptr: Pointer,
140        len: usize,
141        ecx: &mut MiriInterpCx<'tcx>,
142        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
143    ) -> InterpResult<'tcx> {
144        assert!(communicate_allowed, "cannot have `Socket` with isolation enabled!");
145
146        let socket = self;
147
148        ecx.ensure_connected(
149            socket.clone(),
150            !socket.is_non_block.get(),
151            "write",
152            callback!(
153                @capture<'tcx> {
154                    socket: FileDescriptionRef<Socket>,
155                    ptr: Pointer,
156                    len: usize,
157                    finish: DynMachineCallback<'tcx, Result<usize, IoError>>
158                } |this, result: Result<(), ()>| {
159                    if result.is_err() {
160                        return finish.call(this, Err(LibcError("ENOTCONN")))
161                    }
162
163                    // Since `write` is the same as `send` with no flags, we just treat
164                    // the `write` as a `send` here.
165
166                    if socket.is_non_block.get() {
167                        // We have a non-blocking socket and thus don't want to block until
168                        // we can write.
169                        let result = this.try_non_block_send(&socket, ptr, len)?;
170                        return finish.call(this, result)
171                    } else {
172                        // The socket is in blocking mode and thus the write call should block
173                        // until we can write some bytes into the socket.
174                        this.block_for_send(socket, ptr, len, finish)
175                    }
176                }
177            ),
178        )
179    }
180
181    fn short_fd_operations(&self) -> bool {
182        // Linux guarantees that when a read/write on a streaming socket comes back short,
183        // the kernel buffer is empty/full:
184        // See <https://man7.org/linux/man-pages/man7/epoll.7.html> in Q&A section.
185        // So we can't do short reads/writes here.
186        false
187    }
188
189    fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
190        self
191    }
192
193    fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
194        let mut flags = ecx.eval_libc_i32("O_RDWR");
195
196        if self.is_non_block.get() {
197            flags |= ecx.eval_libc_i32("O_NONBLOCK");
198        }
199
200        interp_ok(Scalar::from_i32(flags))
201    }
202
203    fn set_flags<'tcx>(
204        &self,
205        mut flag: i32,
206        ecx: &mut MiriInterpCx<'tcx>,
207    ) -> InterpResult<'tcx, Scalar> {
208        let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
209
210        // O_NONBLOCK flag can be set / unset by user.
211        if flag & o_nonblock == o_nonblock {
212            self.is_non_block.set(true);
213            flag &= !o_nonblock;
214        } else {
215            self.is_non_block.set(false);
216        }
217
218        // Throw error if there is any unsupported flag.
219        if flag != 0 {
220            throw_unsup_format!("fcntl: only O_NONBLOCK is supported for sockets")
221        }
222
223        interp_ok(Scalar::from_i32(0))
224    }
225}
226
227impl UnixFileDescription for Socket {
228    fn ioctl<'tcx>(
229        &self,
230        op: Scalar,
231        arg: Option<&OpTy<'tcx>>,
232        ecx: &mut MiriInterpCx<'tcx>,
233    ) -> InterpResult<'tcx, i32> {
234        assert!(ecx.machine.communicate(), "cannot have `Socket` with isolation enabled!");
235
236        let fionbio = ecx.eval_libc("FIONBIO");
237
238        if op == fionbio {
239            // On these OSes, Rust uses the ioctl, so we trust that it is reasonable and controls
240            // the same internal flag as fcntl.
241            if !matches!(ecx.tcx.sess.target.os, Os::Linux | Os::Android | Os::MacOs | Os::FreeBsd)
242            {
243                // FIONBIO cannot be used to change the blocking mode of a socket on solarish targets:
244                // <https://github.com/rust-lang/rust/commit/dda5c97675b4f5b1f6fdab64606c8a1f21021b0a>
245                // Since there might be more targets which do weird things with this option, we use
246                // an allowlist instead of just denying solarish targets.
247                throw_unsup_format!(
248                    "ioctl: setting FIONBIO on sockets is unsupported on target {}",
249                    ecx.tcx.sess.target.os
250                );
251            }
252
253            let Some(value_ptr) = arg else {
254                throw_ub_format!("ioctl: setting FIONBIO on sockets requires a third argument");
255            };
256            let value = ecx.deref_pointer_as(value_ptr, ecx.machine.layouts.i32)?;
257            let non_block = ecx.read_scalar(&value)?.to_i32()? != 0;
258            self.is_non_block.set(non_block);
259            return interp_ok(0);
260        }
261
262        throw_unsup_format!("ioctl: unsupported operation {op:#x} on socket");
263    }
264
265    fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> {
266        interp_ok(EpollReadiness::from(&*self.io_readiness.borrow()))
267    }
268}
269
270impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
271pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
272    /// For more information on the arguments see the socket manpage:
273    /// <https://linux.die.net/man/2/socket>
274    fn socket(
275        &mut self,
276        domain: &OpTy<'tcx>,
277        type_: &OpTy<'tcx>,
278        protocol: &OpTy<'tcx>,
279    ) -> InterpResult<'tcx, Scalar> {
280        let this = self.eval_context_mut();
281
282        let domain = this.read_scalar(domain)?.to_i32()?;
283        let mut flags = this.read_scalar(type_)?.to_i32()?;
284        let protocol = this.read_scalar(protocol)?.to_i32()?;
285
286        // Reject if isolation is enabled
287        if let IsolatedOp::Reject(reject_with) = this.machine.isolated_op {
288            this.reject_in_isolation("`socket`", reject_with)?;
289            return this.set_last_error_and_return_i32(LibcError("EACCES"));
290        }
291
292        let mut is_sock_nonblock = false;
293
294        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
295        // if there is anything left at the end, that's an unsupported flag.
296        if matches!(
297            this.tcx.sess.target.os,
298            Os::Linux | Os::Android | Os::FreeBsd | Os::Solaris | Os::Illumos
299        ) {
300            // SOCK_NONBLOCK and SOCK_CLOEXEC only exist on Linux, Android, FreeBSD,
301            // Solaris, and Illumos targets.
302            let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
303            let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
304            if flags & sock_nonblock == sock_nonblock {
305                is_sock_nonblock = true;
306                flags &= !sock_nonblock;
307            }
308            if flags & sock_cloexec == sock_cloexec {
309                // We don't support `exec` so we can ignore this.
310                flags &= !sock_cloexec;
311            }
312        }
313
314        let family = if domain == this.eval_libc_i32("AF_INET") {
315            SocketFamily::IPv4
316        } else if domain == this.eval_libc_i32("AF_INET6") {
317            SocketFamily::IPv6
318        } else {
319            throw_unsup_format!(
320                "socket: domain {:#x} is unsupported, only AF_INET and \
321                AF_INET6 are allowed.",
322                domain
323            );
324        };
325
326        if flags != this.eval_libc_i32("SOCK_STREAM") {
327            throw_unsup_format!(
328                "socket: type {:#x} is unsupported, only SOCK_STREAM, \
329                SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
330                flags
331            );
332        }
333        if protocol != 0 && protocol != this.eval_libc_i32("IPPROTO_TCP") {
334            throw_unsup_format!(
335                "socket: socket protocol {protocol} is unsupported, \
336                only IPPROTO_TCP and 0 are allowed"
337            );
338        }
339
340        let fds = &mut this.machine.fds;
341        let fd = fds.new_ref(Socket {
342            family,
343            state: RefCell::new(SocketState::Initial),
344            is_non_block: Cell::new(is_sock_nonblock),
345            io_readiness: RefCell::new(BlockingIoSourceReadiness::empty()),
346            error: RefCell::new(None),
347        });
348
349        interp_ok(Scalar::from_i32(fds.insert(fd)))
350    }
351
352    fn bind(
353        &mut self,
354        socket: &OpTy<'tcx>,
355        address: &OpTy<'tcx>,
356        address_len: &OpTy<'tcx>,
357    ) -> InterpResult<'tcx, Scalar> {
358        let this = self.eval_context_mut();
359
360        let socket = this.read_scalar(socket)?.to_i32()?;
361        let address = match this.read_socket_address(address, address_len, "bind")? {
362            Ok(addr) => addr,
363            Err(e) => return this.set_last_error_and_return_i32(e),
364        };
365
366        // Get the file handle
367        let Some(fd) = this.machine.fds.get(socket) else {
368            return this.set_last_error_and_return_i32(LibcError("EBADF"));
369        };
370
371        let Some(socket) = fd.downcast::<Socket>() else {
372            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
373            return this.set_last_error_and_return_i32(LibcError("ENOTSOCK"));
374        };
375
376        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
377
378        let mut state = socket.state.borrow_mut();
379
380        match *state {
381            SocketState::Initial => {
382                let address_family = match &address {
383                    SocketAddr::V4(_) => SocketFamily::IPv4,
384                    SocketAddr::V6(_) => SocketFamily::IPv6,
385                };
386
387                if socket.family != address_family {
388                    // Attempted to bind an address from a family that doesn't match
389                    // the family of the socket.
390                    let err = if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android) {
391                        // Linux man page states that `EINVAL` is used when there is an address family mismatch.
392                        // See <https://man7.org/linux/man-pages/man2/bind.2.html>
393                        LibcError("EINVAL")
394                    } else {
395                        // POSIX man page states that `EAFNOSUPPORT` should be used when there is an address
396                        // family mismatch.
397                        // See <https://man7.org/linux/man-pages/man3/bind.3p.html>
398                        LibcError("EAFNOSUPPORT")
399                    };
400                    return this.set_last_error_and_return_i32(err);
401                }
402
403                *state = SocketState::Bound(address);
404            }
405            SocketState::Connecting(_) | SocketState::Connected(_) =>
406                throw_unsup_format!(
407                    "bind: socket is already connected and binding a
408                    connected socket is unsupported"
409                ),
410            SocketState::Bound(_) | SocketState::Listening(_) =>
411                throw_unsup_format!(
412                    "bind: socket is already bound and binding a socket \
413                    multiple times is unsupported"
414                ),
415        }
416
417        interp_ok(Scalar::from_i32(0))
418    }
419
420    fn listen(&mut self, socket: &OpTy<'tcx>, backlog: &OpTy<'tcx>) -> InterpResult<'tcx, Scalar> {
421        let this = self.eval_context_mut();
422
423        let socket = this.read_scalar(socket)?.to_i32()?;
424        // Since the backlog value is just a performance hint we can ignore it.
425        let _backlog = this.read_scalar(backlog)?.to_i32()?;
426
427        // Get the file handle
428        let Some(fd) = this.machine.fds.get(socket) else {
429            return this.set_last_error_and_return_i32(LibcError("EBADF"));
430        };
431
432        let Some(socket) = fd.downcast::<Socket>() else {
433            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
434            return this.set_last_error_and_return_i32(LibcError("ENOTSOCK"));
435        };
436
437        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
438
439        let mut state = socket.state.borrow_mut();
440
441        match *state {
442            SocketState::Bound(socket_addr) =>
443                match TcpListener::bind(socket_addr) {
444                    Ok(listener) => {
445                        *state = SocketState::Listening(listener);
446                        drop(state);
447                        // Register the socket to the blocking I/O manager because
448                        // we now have an associated host socket.
449                        this.machine.blocking_io.register(socket);
450                    }
451                    Err(e) => return this.set_last_error_and_return_i32(e),
452                },
453            SocketState::Initial => {
454                throw_unsup_format!(
455                    "listen: listening on a socket which isn't bound is unsupported"
456                )
457            }
458            SocketState::Listening(_) => {
459                throw_unsup_format!("listen: listening on a socket multiple times is unsupported")
460            }
461            SocketState::Connecting(_) | SocketState::Connected(_) => {
462                throw_unsup_format!("listen: listening on a connected socket is unsupported")
463            }
464        }
465
466        interp_ok(Scalar::from_i32(0))
467    }
468
469    /// For more information on the arguments see the accept manpage:
470    /// <https://linux.die.net/man/2/accept4>
471    fn accept4(
472        &mut self,
473        socket: &OpTy<'tcx>,
474        address: &OpTy<'tcx>,
475        address_len: &OpTy<'tcx>,
476        flags: Option<&OpTy<'tcx>>,
477        // Location where the output scalar is written to.
478        dest: &MPlaceTy<'tcx>,
479    ) -> InterpResult<'tcx> {
480        let this = self.eval_context_mut();
481
482        let socket = this.read_scalar(socket)?.to_i32()?;
483        let address_ptr = this.read_pointer(address)?;
484        let address_len_ptr = this.read_pointer(address_len)?;
485        let mut flags =
486            if let Some(flags) = flags { this.read_scalar(flags)?.to_i32()? } else { 0 };
487
488        // Get the file handle
489        let Some(fd) = this.machine.fds.get(socket) else {
490            return this.set_last_error_and_return(LibcError("EBADF"), dest);
491        };
492
493        let Some(socket) = fd.downcast::<Socket>() else {
494            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
495            return this.set_last_error_and_return(LibcError("ENOTSOCK"), dest);
496        };
497
498        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
499
500        if !matches!(*socket.state.borrow(), SocketState::Listening(_)) {
501            throw_unsup_format!(
502                "accept4: accepting incoming connections is only allowed when socket is listening"
503            )
504        };
505
506        let mut is_client_sock_nonblock = false;
507
508        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
509        // if there is anything left at the end, that's an unsupported flag.
510        if matches!(
511            this.tcx.sess.target.os,
512            Os::Linux | Os::Android | Os::FreeBsd | Os::Solaris | Os::Illumos
513        ) {
514            // SOCK_NONBLOCK and SOCK_CLOEXEC only exist on Linux, Android, FreeBSD,
515            // Solaris, and Illumos targets.
516            let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
517            let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
518            if flags & sock_nonblock == sock_nonblock {
519                is_client_sock_nonblock = true;
520                flags &= !sock_nonblock;
521            }
522            if flags & sock_cloexec == sock_cloexec {
523                // We don't support `exec` so we can ignore this.
524                flags &= !sock_cloexec;
525            }
526        }
527
528        if flags != 0 {
529            throw_unsup_format!(
530                "accept4: flag {flags:#x} is unsupported, only SOCK_CLOEXEC \
531                and SOCK_NONBLOCK are allowed",
532            );
533        }
534
535        if socket.is_non_block.get() {
536            // We have a non-blocking socket and thus don't want to block until
537            // we can accept an incoming connection.
538            match this.try_non_block_accept(
539                &socket,
540                address_ptr,
541                address_len_ptr,
542                is_client_sock_nonblock,
543            )? {
544                Ok(sockfd) => {
545                    // We need to create the scalar using the destination size since
546                    // `syscall(SYS_accept4, ...)` returns a long which doesn't match
547                    // the int returned from the `accept`/`accept4` syscalls.
548                    // See <https://man7.org/linux/man-pages/man2/syscall.2.html>.
549                    this.write_scalar(Scalar::from_int(sockfd, dest.layout.size), dest)
550                }
551                Err(e) => this.set_last_error_and_return(e, dest),
552            }
553        } else {
554            // The socket is in blocking mode and thus the accept call should block
555            // until an incoming connection is ready.
556            this.block_for_accept(
557                socket,
558                address_ptr,
559                address_len_ptr,
560                is_client_sock_nonblock,
561                dest.clone(),
562            )
563        }
564    }
565
566    fn connect(
567        &mut self,
568        socket: &OpTy<'tcx>,
569        address: &OpTy<'tcx>,
570        address_len: &OpTy<'tcx>,
571        // Location where the output scalar is written to.
572        dest: &MPlaceTy<'tcx>,
573    ) -> InterpResult<'tcx> {
574        let this = self.eval_context_mut();
575
576        let socket = this.read_scalar(socket)?.to_i32()?;
577        let address = match this.read_socket_address(address, address_len, "connect")? {
578            Ok(address) => address,
579            Err(e) => return this.set_last_error_and_return(e, dest),
580        };
581
582        // Get the file handle
583        let Some(fd) = this.machine.fds.get(socket) else {
584            return this.set_last_error_and_return(LibcError("EBADF"), dest);
585        };
586
587        let Some(socket) = fd.downcast::<Socket>() else {
588            // Man page specifies to return ENOTSOCK if `fd` is not a socket
589            return this.set_last_error_and_return(LibcError("ENOTSOCK"), dest);
590        };
591
592        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
593
594        match &*socket.state.borrow() {
595            SocketState::Initial => { /* fall-through to below */ }
596            // The socket is already in a connecting state.
597            SocketState::Connecting(_) =>
598                return this.set_last_error_and_return(LibcError("EALREADY"), dest),
599            // We don't return EISCONN for already connected sockets, for which we're
600            // sure that the connection is established, since TCP sockets are usually
601            // allowed to be connected multiple times.
602            _ =>
603                throw_unsup_format!(
604                    "connect: connecting is only supported for sockets which are neither \
605                    bound, listening nor already connected"
606                ),
607        }
608
609        // This begins establishing the connection, but does not block until the stream is fully connected.
610        // We deal with that below.
611        match TcpStream::connect(address) {
612            Ok(stream) => {
613                *socket.state.borrow_mut() = SocketState::Connecting(stream);
614                // Register the socket to the blocking I/O manager because
615                // we now have an associated host socket.
616                this.machine.blocking_io.register(socket.clone());
617            }
618            Err(e) => return this.set_last_error_and_return(e, dest),
619        };
620
621        if socket.is_non_block.get() {
622            // We have a non-blocking socket and thus don't want to block until
623            // the connection is established.
624
625            // Since the [`TcpStream::connect`] function of mio hides the EINPROGRESS
626            // we just always return EINPROGRESS and check whether the connection succeeded
627            // once we want to use the connected socket.
628            this.set_last_error_and_return(LibcError("EINPROGRESS"), dest)
629        } else {
630            // The socket is in blocking mode and thus the connect call should block
631            // until the connection with the server is established.
632
633            let dest = dest.clone();
634
635            this.ensure_connected(
636                socket,
637                /* should_wait */ true,
638                "connect",
639                callback!(
640                    @capture<'tcx> {
641                        dest: MPlaceTy<'tcx>
642                    } |this, result: Result<(), ()>| {
643                        if result.is_err() {
644                            this.set_last_error_and_return(LibcError("ENOTCONN"), &dest)
645                        } else {
646                            this.write_scalar(Scalar::from_i32(0), &dest)
647                        }
648                    }
649                ),
650            )
651        }
652    }
653
654    fn send(
655        &mut self,
656        socket: &OpTy<'tcx>,
657        buffer: &OpTy<'tcx>,
658        length: &OpTy<'tcx>,
659        flags: &OpTy<'tcx>,
660        // Location where the output scalar is written to.
661        dest: &MPlaceTy<'tcx>,
662    ) -> InterpResult<'tcx> {
663        let this = self.eval_context_mut();
664
665        let socket = this.read_scalar(socket)?.to_i32()?;
666        let buffer_ptr = this.read_pointer(buffer)?;
667        let size_layout = this.libc_ty_layout("size_t");
668        let length: usize =
669            this.read_scalar(length)?.to_uint(size_layout.size)?.try_into().unwrap();
670        let mut flags = this.read_scalar(flags)?.to_i32()?;
671
672        // Get the file handle
673        let Some(fd) = this.machine.fds.get(socket) else {
674            return this.set_last_error_and_return(LibcError("EBADF"), dest);
675        };
676
677        let Some(socket) = fd.downcast::<Socket>() else {
678            // Man page specifies to return ENOTSOCK if `fd` is not a socket
679            return this.set_last_error_and_return(LibcError("ENOTSOCK"), dest);
680        };
681
682        let mut is_op_non_block = false;
683
684        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
685        // if there is anything left at the end, that's an unsupported flag.
686        if matches!(
687            this.tcx.sess.target.os,
688            Os::Linux | Os::Android | Os::FreeBsd | Os::Solaris | Os::Illumos
689        ) {
690            // MSG_NOSIGNAL and MSG_DONTWAIT only exist on Linux, Android, FreeBSD,
691            // Solaris, and Illumos targets.
692            let msg_nosignal = this.eval_libc_i32("MSG_NOSIGNAL");
693            let msg_dontwait = this.eval_libc_i32("MSG_DONTWAIT");
694            if flags & msg_nosignal == msg_nosignal {
695                // This is only needed to ensure that no EPIPE signal is sent when
696                // trying to send into a stream which is no longer connected.
697                // Since we don't support signals, we can ignore this.
698                flags &= !msg_nosignal;
699            }
700            if flags & msg_dontwait == msg_dontwait {
701                flags &= !msg_dontwait;
702                is_op_non_block = true;
703            }
704        }
705
706        if flags != 0 {
707            throw_unsup_format!(
708                "send: flag {flags:#x} is unsupported, only MSG_NOSIGNAL and MSG_DONTWAIT are allowed",
709            );
710        }
711
712        // If either the operation or the socket is non-blocking, we don't want
713        // to wait until the connection is established.
714        let should_wait = !is_op_non_block && !socket.is_non_block.get();
715        let dest = dest.clone();
716
717        this.ensure_connected(
718            socket.clone(),
719            should_wait,
720            "send",
721            callback!(
722                @capture<'tcx> {
723                    socket: FileDescriptionRef<Socket>,
724                    flags: i32,
725                    buffer_ptr: Pointer,
726                    length: usize,
727                    is_op_non_block: bool,
728                    dest: MPlaceTy<'tcx>,
729                } |this, result: Result<(), ()>| {
730                    if result.is_err() {
731                        return this.set_last_error_and_return(LibcError("ENOTCONN"), &dest)
732                    }
733
734                    if is_op_non_block || socket.is_non_block.get() {
735                        // We have a non-blocking operation or a non-blocking socket and
736                        // thus don't want to block until we can send.
737                        match this.try_non_block_send(&socket, buffer_ptr, length)? {
738                            Ok(size) => this.write_scalar(Scalar::from_target_isize(size.try_into().unwrap(), this), &dest),
739                            Err(e) => this.set_last_error_and_return(e, &dest),
740                        }
741                    } else {
742                        // The socket is in blocking mode and thus the send call should block
743                        // until we can send some bytes into the socket.
744                        this.block_for_send(
745                            socket,
746                            buffer_ptr,
747                            length,
748                            callback!(@capture<'tcx> {
749                                dest: MPlaceTy<'tcx>
750                            } |this, result: Result<usize, IoError>| {
751                                match result {
752                                    Ok(size) => this.write_scalar(Scalar::from_target_isize(size.try_into().unwrap(), this), &dest),
753                                    Err(e) => this.set_last_error_and_return(e, &dest)
754                                }
755                            }),
756                        )
757                    }
758                }
759            ),
760        )
761    }
762
763    fn recv(
764        &mut self,
765        socket: &OpTy<'tcx>,
766        buffer: &OpTy<'tcx>,
767        length: &OpTy<'tcx>,
768        flags: &OpTy<'tcx>,
769        // Location where the output scalar is written to.
770        dest: &MPlaceTy<'tcx>,
771    ) -> InterpResult<'tcx> {
772        let this = self.eval_context_mut();
773
774        let socket = this.read_scalar(socket)?.to_i32()?;
775        let buffer_ptr = this.read_pointer(buffer)?;
776        let size_layout = this.libc_ty_layout("size_t");
777        let length: usize =
778            this.read_scalar(length)?.to_uint(size_layout.size)?.try_into().unwrap();
779        let mut flags = this.read_scalar(flags)?.to_i32()?;
780
781        // Get the file handle
782        let Some(fd) = this.machine.fds.get(socket) else {
783            return this.set_last_error_and_return(LibcError("EBADF"), dest);
784        };
785
786        let Some(socket) = fd.downcast::<Socket>() else {
787            // Man page specifies to return ENOTSOCK if `fd` is not a socket
788            return this.set_last_error_and_return(LibcError("ENOTSOCK"), dest);
789        };
790
791        let mut should_peek = false;
792        let mut is_op_non_block = false;
793
794        // Interpret the flag. Every flag we recognize is "subtracted" from `flags`, so
795        // if there is anything left at the end, that's an unsupported flag.
796
797        let msg_peek = this.eval_libc_i32("MSG_PEEK");
798        if flags & msg_peek == msg_peek {
799            should_peek = true;
800            flags &= !msg_peek;
801        }
802
803        if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android | Os::FreeBsd | Os::Illumos) {
804            // MSG_CMSG_CLOEXEC only exists on Linux, Android, FreeBSD,
805            // and Illumos targets.
806            let msg_cmsg_cloexec = this.eval_libc_i32("MSG_CMSG_CLOEXEC");
807            if flags & msg_cmsg_cloexec == msg_cmsg_cloexec {
808                // We don't support `exec` so we can ignore this.
809                flags &= !msg_cmsg_cloexec;
810            }
811        }
812
813        if matches!(
814            this.tcx.sess.target.os,
815            Os::Linux | Os::Android | Os::FreeBsd | Os::Solaris | Os::Illumos
816        ) {
817            // MSG_DONTWAIT only exists on Linux, Android, FreeBSD,
818            // Solaris, and Illumos targets.
819            let msg_dontwait = this.eval_libc_i32("MSG_DONTWAIT");
820            if flags & msg_dontwait == msg_dontwait {
821                flags &= !msg_dontwait;
822                is_op_non_block = true;
823            }
824        }
825
826        if flags != 0 {
827            throw_unsup_format!(
828                "recv: flag {flags:#x} is unsupported, only MSG_PEEK, MSG_DONTWAIT \
829                and MSG_CMSG_CLOEXEC are allowed",
830            );
831        }
832
833        // If either the operation or the socket is non-blocking, we don't want
834        // to wait until the connection is established.
835        let should_wait = !is_op_non_block && !socket.is_non_block.get();
836        let dest = dest.clone();
837
838        this.ensure_connected(
839            socket.clone(),
840            should_wait,
841            "recv",
842            callback!(
843                @capture<'tcx> {
844                    socket: FileDescriptionRef<Socket>,
845                    buffer_ptr: Pointer,
846                    length: usize,
847                    should_peek: bool,
848                    is_op_non_block: bool,
849                    dest: MPlaceTy<'tcx>,
850                } |this, result: Result<(), ()>| {
851                    if result.is_err() {
852                        return this.set_last_error_and_return(LibcError("ENOTCONN"), &dest)
853                    }
854
855                    if is_op_non_block || socket.is_non_block.get() {
856                        // We have a non-blocking operation or a non-blocking socket and
857                        // thus don't want to block until we can receive.
858                        match this.try_non_block_recv(&socket, buffer_ptr, length, should_peek)? {
859                            Ok(size) => this.write_scalar(Scalar::from_target_isize(size.try_into().unwrap(), this), &dest),
860                            Err(e) => this.set_last_error_and_return(e, &dest),
861                        }
862                    } else {
863                        // The socket is in blocking mode and thus the receive call should block
864                        // until we can receive some bytes from the socket.
865                        this.block_for_recv(
866                            socket,
867                            buffer_ptr,
868                            length,
869                            should_peek,
870                            callback!(@capture<'tcx> {
871                                dest: MPlaceTy<'tcx>
872                            } |this, result: Result<usize, IoError>| {
873                                match result {
874                                    Ok(size) => this.write_scalar(Scalar::from_target_isize(size.try_into().unwrap(), this), &dest),
875                                    Err(e) => this.set_last_error_and_return(e, &dest)
876                                }
877                            }),
878                        )
879                    }
880                }
881            ),
882        )
883    }
884
885    fn setsockopt(
886        &mut self,
887        socket: &OpTy<'tcx>,
888        level: &OpTy<'tcx>,
889        option_name: &OpTy<'tcx>,
890        option_value: &OpTy<'tcx>,
891        option_len: &OpTy<'tcx>,
892    ) -> InterpResult<'tcx, Scalar> {
893        let this = self.eval_context_mut();
894
895        let socket = this.read_scalar(socket)?.to_i32()?;
896        let level = this.read_scalar(level)?.to_i32()?;
897        let option_name = this.read_scalar(option_name)?.to_i32()?;
898        let socklen_layout = this.libc_ty_layout("socklen_t");
899        let option_len = this.read_scalar(option_len)?.to_int(socklen_layout.size)?;
900
901        // Get the file handle
902        let Some(fd) = this.machine.fds.get(socket) else {
903            return this.set_last_error_and_return_i32(LibcError("EBADF"));
904        };
905
906        let Some(_socket) = fd.downcast::<Socket>() else {
907            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
908            return this.set_last_error_and_return_i32(LibcError("ENOTSOCK"));
909        };
910
911        if level == this.eval_libc_i32("SOL_SOCKET") {
912            let opt_so_reuseaddr = this.eval_libc_i32("SO_REUSEADDR");
913
914            if matches!(this.tcx.sess.target.os, Os::MacOs | Os::FreeBsd | Os::NetBsd) {
915                // SO_NOSIGPIPE only exists on MacOS, FreeBSD, and NetBSD.
916                let opt_so_nosigpipe = this.eval_libc_i32("SO_NOSIGPIPE");
917
918                if option_name == opt_so_nosigpipe {
919                    if option_len != 4 {
920                        // Option value should be C-int which is usually 4 bytes.
921                        return this.set_last_error_and_return_i32(LibcError("EINVAL"));
922                    }
923                    let option_value =
924                        this.deref_pointer_as(option_value, this.machine.layouts.i32)?;
925                    let _val = this.read_scalar(&option_value)?.to_i32()?;
926                    // We entirely ignore this value since we do not support signals anyway.
927
928                    return interp_ok(Scalar::from_i32(0));
929                }
930            }
931
932            if option_name == opt_so_reuseaddr {
933                if option_len != 4 {
934                    // Option value should be C-int which is usually 4 bytes.
935                    return this.set_last_error_and_return_i32(LibcError("EINVAL"));
936                }
937                let option_value = this.deref_pointer_as(option_value, this.machine.layouts.i32)?;
938                let _val = this.read_scalar(&option_value)?.to_i32()?;
939                // We entirely ignore this: std always sets REUSEADDR for us, and in the end it's more of a
940                // hint to bypass some arbitrary timeout anyway.
941                return interp_ok(Scalar::from_i32(0));
942            } else {
943                throw_unsup_format!(
944                    "setsockopt: option {option_name:#x} is unsupported for level SOL_SOCKET",
945                );
946            }
947        }
948
949        throw_unsup_format!(
950            "setsockopt: level {level:#x} is unsupported, only SOL_SOCKET is allowed"
951        );
952    }
953
954    fn getsockopt(
955        &mut self,
956        socket: &OpTy<'tcx>,
957        level: &OpTy<'tcx>,
958        option_name: &OpTy<'tcx>,
959        option_value: &OpTy<'tcx>,
960        option_len: &OpTy<'tcx>,
961    ) -> InterpResult<'tcx, Scalar> {
962        let this = self.eval_context_mut();
963
964        let socket = this.read_scalar(socket)?.to_i32()?;
965        let level = this.read_scalar(level)?.to_i32()?;
966        let option_name = this.read_scalar(option_name)?.to_i32()?;
967        // These two pointers are used to return the value: `len_ptr` initially stores how much space
968        // is available. If the actual value fits into that space, it is written to
969        // `value_ptr` and `len_ptr` is updated to represent how many bytes
970        // were actually written. If the value does not fit, it is silently truncated.
971        // Also see <https://pubs.opengroup.org/onlinepubs/9799919799/functions/getsockopt.html>.
972        let option_value_ptr = this.read_pointer(option_value)?;
973        let option_len_ptr = this.read_pointer(option_len)?;
974
975        // Get the file handle
976        let Some(fd) = this.machine.fds.get(socket) else {
977            return this.set_last_error_and_return_i32(LibcError("EBADF"));
978        };
979
980        let Some(socket) = fd.downcast::<Socket>() else {
981            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
982            return this.set_last_error_and_return_i32(LibcError("ENOTSOCK"));
983        };
984
985        if option_value_ptr == Pointer::null() || option_len_ptr == Pointer::null() {
986            // This socket option returns a value and thus we need to return EFAULT
987            // when either the value or the length pointers are null pointers.
988            return this.set_last_error_and_return_i32(LibcError("EFAULT"));
989        }
990
991        let socklen_layout = this.libc_ty_layout("socklen_t");
992        let option_len_ptr_mplace = this.ptr_to_mplace(option_len_ptr, socklen_layout);
993        let option_len: usize = this
994            .read_scalar(&option_len_ptr_mplace)?
995            .to_int(socklen_layout.size)?
996            .try_into()
997            .unwrap();
998
999        // We need a temporary buffer as `option_value_ptr` might not point to a large enough
1000        // buffer, in which case we have to truncate.
1001        let value_buffer = if level == this.eval_libc_i32("SOL_SOCKET") {
1002            let opt_so_error = this.eval_libc_i32("SO_ERROR");
1003
1004            if option_name == opt_so_error {
1005                // Because `TcpStream::take_error()` and `TcpListener::take_error()` consume the latest async
1006                // error, we know that our stored `socket.error` is outdated when `TcpStream::take_error()`/
1007                // `TcpListener::take_error()` returns `Ok(Some(...))`.
1008                // If they return `Ok(None)`, then we fall back to the stored `socket.error`.
1009                let error = match &*socket.state.borrow() {
1010                    SocketState::Initial | SocketState::Bound(_) => socket.error.take(),
1011                    SocketState::Listening(listener) =>
1012                        listener.take_error().unwrap_or(socket.error.take()),
1013                    SocketState::Connecting(stream) | SocketState::Connected(stream) =>
1014                        stream.take_error().unwrap_or(socket.error.take()),
1015                };
1016                // Clear our own stored error -- it was either `take`n above or it is outdated.
1017                socket.error.replace(None);
1018
1019                // We know there is no longer an async error and thus we need to update the
1020                // I/O and epoll readiness of the socket.
1021                socket.io_readiness.borrow_mut().error = false;
1022                this.update_epoll_active_events(socket, /* force_edge */ false)?;
1023
1024                let return_value = match error {
1025                    Some(err) => this.io_error_to_errnum(err)?.to_i32()?,
1026                    // If there is no error, we write 0 into the option value buffer.
1027                    None => 0,
1028                };
1029
1030                // Allocate new buffer on the stack with the `i32` layout.
1031                let value_buffer = this.allocate(this.machine.layouts.i32, MemoryKind::Stack)?;
1032                this.write_int(return_value, &value_buffer)?;
1033                value_buffer
1034            } else {
1035                throw_unsup_format!(
1036                    "getsockopt: option {option_name:#x} is unsupported for level SOL_SOCKET",
1037                );
1038            }
1039        } else if level == this.eval_libc_i32("IPPROTO_IP") {
1040            let opt_ip_ttl = this.eval_libc_i32("IP_TTL");
1041
1042            if option_name == opt_ip_ttl {
1043                let ttl = match &*socket.state.borrow() {
1044                    SocketState::Initial | SocketState::Bound(_) =>
1045                        throw_unsup_format!(
1046                            "getsockopt: reading option IP_TTL on level IPPROTO_IP is only supported \
1047                            on connected and listening sockets"
1048                        ),
1049                    SocketState::Listening(listener) => listener.ttl(),
1050                    SocketState::Connecting(stream) | SocketState::Connected(stream) =>
1051                        stream.ttl(),
1052                };
1053
1054                let ttl = match ttl {
1055                    Ok(ttl) => ttl,
1056                    Err(e) => return this.set_last_error_and_return_i32(e),
1057                };
1058
1059                // Allocate new buffer on the stack with the `u32` layout.
1060                let value_buffer = this.allocate(this.machine.layouts.u32, MemoryKind::Stack)?;
1061                this.write_int(ttl, &value_buffer)?;
1062                value_buffer
1063            } else {
1064                throw_unsup_format!(
1065                    "getsockopt: option {option_name:#x} is unsupported for level IPPROTO_IP",
1066                );
1067            }
1068        } else {
1069            throw_unsup_format!(
1070                "getsockopt: level {level:#x} is unsupported, only SOL_SOCKET is allowed"
1071            )
1072        };
1073
1074        // Truncated size of the output value.
1075        let output_value_len = value_buffer.layout.size.min(Size::from_bytes(option_len));
1076        // Copy the truncated value into the buffer pointed to by `option_value_ptr`.
1077        this.mem_copy(
1078            value_buffer.ptr(),
1079            option_value_ptr,
1080            // Truncate the value to fit the provided buffer.
1081            output_value_len,
1082            // The buffers are guaranteed to not overlap since the `value_buffer`
1083            // was just newly allocated on the stack.
1084            true,
1085        )?;
1086        // Deallocate the value buffer as it was only needed to store the value and
1087        // copy it into the buffer pointed to by `option_value_ptr`.
1088        this.deallocate_ptr(value_buffer.ptr(), None, MemoryKind::Stack)?;
1089
1090        // On output, the length pointer contains the amount of bytes written -- not the size
1091        // of the value before truncation.
1092        this.write_scalar(
1093            Scalar::from_uint(output_value_len.bytes(), socklen_layout.size),
1094            &option_len_ptr_mplace,
1095        )?;
1096
1097        interp_ok(Scalar::from_i32(0))
1098    }
1099
1100    fn getsockname(
1101        &mut self,
1102        socket: &OpTy<'tcx>,
1103        address: &OpTy<'tcx>,
1104        address_len: &OpTy<'tcx>,
1105    ) -> InterpResult<'tcx, Scalar> {
1106        let this = self.eval_context_mut();
1107
1108        let socket = this.read_scalar(socket)?.to_i32()?;
1109        let address_ptr = this.read_pointer(address)?;
1110        let address_len_ptr = this.read_pointer(address_len)?;
1111
1112        // Get the file handle
1113        let Some(fd) = this.machine.fds.get(socket) else {
1114            return this.set_last_error_and_return_i32(LibcError("EBADF"));
1115        };
1116
1117        let Some(socket) = fd.downcast::<Socket>() else {
1118            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
1119            return this.set_last_error_and_return_i32(LibcError("ENOTSOCK"));
1120        };
1121
1122        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
1123
1124        let state = socket.state.borrow();
1125
1126        let address = match &*state {
1127            SocketState::Bound(address) => {
1128                if address.port() == 0 {
1129                    // The socket is bound to a zero-port which means it gets assigned a random
1130                    // port. Since we don't yet have an underlying socket, we don't know what this
1131                    // random port will be and thus this is unsupported.
1132                    throw_unsup_format!(
1133                        "getsockname: when the port is 0, getting the socket address before \
1134                        calling `listen` or `connect` is unsupported"
1135                    )
1136                }
1137
1138                *address
1139            }
1140            SocketState::Listening(listener) =>
1141                match listener.local_addr() {
1142                    Ok(address) => address,
1143                    Err(e) => return this.set_last_error_and_return_i32(e),
1144                },
1145            SocketState::Connecting(stream) | SocketState::Connected(stream) => {
1146                if cfg!(windows) && matches!(&*state, SocketState::Connecting(_)) {
1147                    // FIXME: On Windows hosts `TcpStream::local_addr` returns `0.0.0.0:0` whilst
1148                    // the socket is connecting:
1149                    // <https://learn.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockname#remarks>
1150                    // This is problematic because UNIX targets could expect a real local address even
1151                    // for a connecting non-blocking socket.
1152
1153                    static DEDUP: AtomicBool = AtomicBool::new(false);
1154                    if !DEDUP.swap(true, std::sync::atomic::Ordering::Relaxed) {
1155                        this.emit_diagnostic(NonHaltingDiagnostic::ConnectingSocketGetsockname);
1156                    }
1157                }
1158                match stream.local_addr() {
1159                    Ok(address) => address,
1160                    Err(e) => return this.set_last_error_and_return_i32(e),
1161                }
1162            }
1163            // For non-bound sockets the POSIX manual says the returned address is unspecified.
1164            // Often this is 0.0.0.0:0 and thus we set it to this value.
1165            SocketState::Initial => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1166        };
1167
1168        this.write_socket_address(&address, address_ptr, address_len_ptr, "getsockname")
1169            .map(|_| Scalar::from_i32(0))
1170    }
1171
1172    fn getpeername(
1173        &mut self,
1174        socket: &OpTy<'tcx>,
1175        address: &OpTy<'tcx>,
1176        address_len: &OpTy<'tcx>,
1177        // Location where the output scalar is written to.
1178        dest: &MPlaceTy<'tcx>,
1179    ) -> InterpResult<'tcx> {
1180        let this = self.eval_context_mut();
1181
1182        let socket = this.read_scalar(socket)?.to_i32()?;
1183        let address_ptr = this.read_pointer(address)?;
1184        let address_len_ptr = this.read_pointer(address_len)?;
1185
1186        // Get the file handle
1187        let Some(fd) = this.machine.fds.get(socket) else {
1188            return this.set_last_error_and_return(LibcError("EBADF"), dest);
1189        };
1190
1191        let Some(socket) = fd.downcast::<Socket>() else {
1192            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
1193            return this.set_last_error_and_return(LibcError("ENOTSOCK"), dest);
1194        };
1195
1196        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
1197
1198        let dest = dest.clone();
1199
1200        // It's only safe to call [`TcpStream::peer_addr`] after the socket is connected since
1201        // UNIX targets should return ENOTCONN when the connection is not yet established.
1202        this.ensure_connected(
1203            socket.clone(),
1204            /* should_wait */ false,
1205            "getpeername",
1206            callback!(
1207                @capture<'tcx> {
1208                    socket: FileDescriptionRef<Socket>,
1209                    address_ptr: Pointer,
1210                    address_len_ptr: Pointer,
1211                    dest: MPlaceTy<'tcx>,
1212                } |this, result: Result<(), ()>| {
1213                    if result.is_err() {
1214                        return this.set_last_error_and_return(LibcError("ENOTCONN"), &dest)
1215                    };
1216
1217                    let SocketState::Connected(stream) = &*socket.state.borrow() else {
1218                        unreachable!()
1219                    };
1220
1221                    let address = match stream.peer_addr() {
1222                        Ok(address) => address,
1223                        Err(e) => return this.set_last_error_and_return(e, &dest),
1224                    };
1225
1226                    this.write_socket_address(
1227                        &address,
1228                        address_ptr,
1229                        address_len_ptr,
1230                        "getpeername",
1231                    )?;
1232                   this.write_scalar(Scalar::from_i32(0), &dest)
1233                }
1234            ),
1235        )
1236    }
1237
1238    fn shutdown(&mut self, socket: &OpTy<'tcx>, how: &OpTy<'tcx>) -> InterpResult<'tcx, Scalar> {
1239        let this = self.eval_context_mut();
1240
1241        let socket = this.read_scalar(socket)?.to_i32()?;
1242        let how = this.read_scalar(how)?.to_i32()?;
1243
1244        // Get the file handle
1245        let Some(fd) = this.machine.fds.get(socket) else {
1246            return this.set_last_error_and_return_i32(LibcError("EBADF"));
1247        };
1248
1249        let Some(socket) = fd.downcast::<Socket>() else {
1250            // Man page specifies to return ENOTSOCK if `fd` is not a socket.
1251            return this.set_last_error_and_return_i32(LibcError("ENOTSOCK"));
1252        };
1253
1254        assert!(this.machine.communicate(), "cannot have `Socket` with isolation enabled!");
1255
1256        let state = socket.state.borrow();
1257
1258        let (SocketState::Connecting(stream) | SocketState::Connected(stream)) = &*state else {
1259            return this.set_last_error_and_return_i32(LibcError("ENOTCONN"));
1260        };
1261
1262        let is_read_shutdown = how == this.eval_libc_i32("SHUT_RD");
1263        let is_write_shutdown = how == this.eval_libc_i32("SHUT_WR");
1264        let is_read_write_shutdown = how == this.eval_libc_i32("SHUT_RDWR");
1265
1266        let how = match () {
1267            _ if is_read_shutdown => Shutdown::Read,
1268            _ if is_write_shutdown => Shutdown::Write,
1269            _ if is_read_write_shutdown => Shutdown::Both,
1270            // An invalid value was passed to `how`.
1271            _ => return this.set_last_error_and_return_i32(LibcError("EINVAL")),
1272        };
1273
1274        if let Err(e) = stream.shutdown(how) {
1275            return this.set_last_error_and_return_i32(e);
1276        };
1277
1278        drop(state);
1279
1280        // Because we map cross platform mio readiness to epoll readiness and
1281        // the different platforms don't treat `shutdown` the same way, we set
1282        // the readiness after a `shutdown` manually to achieve more consistent
1283        // epoll readiness. Otherwise we do not generate enough epoll events
1284        // on partial shutdowns on Windows hosts.
1285        let mut readiness = socket.io_readiness.borrow_mut();
1286        // Closing the read end of a socket causes an EPOLLRDHUP event.
1287        readiness.read_closed |= is_read_shutdown || is_read_write_shutdown;
1288        // Only shutting down the write end doesn't cause an EPOLLHUP event
1289        // and thus we won't set the `write_closed` readiness for it here.
1290        readiness.write_closed |= is_read_write_shutdown;
1291        // The Linux kernel also sets EPOLLIN when both ends of a socket are closed:
1292        // <https://github.com/torvalds/linux/blob/HEAD/net/ipv4/tcp.c#L584-L588>
1293        readiness.readable |= is_read_write_shutdown;
1294
1295        drop(readiness);
1296
1297        // Update the epoll readiness for the socket.
1298        this.update_epoll_active_events(socket, /* force_edge */ false)?;
1299
1300        interp_ok(Scalar::from_i32(0))
1301    }
1302}
1303
1304impl<'tcx> EvalContextPrivExt<'tcx> for crate::MiriInterpCx<'tcx> {}
1305trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
1306    /// Block the thread until there's an incoming connection or an error occurred.
1307    ///
1308    /// This recursively calls itself should the operation still block for some reason.
1309    ///
1310    /// **Note**: This function is only safe to call when having previously ensured
1311    /// that the socket is in [`SocketState::Listening`].
1312    fn block_for_accept(
1313        &mut self,
1314        socket: FileDescriptionRef<Socket>,
1315        address_ptr: Pointer,
1316        address_len_ptr: Pointer,
1317        is_client_sock_nonblock: bool,
1318        dest: MPlaceTy<'tcx>,
1319    ) -> InterpResult<'tcx> {
1320        let this = self.eval_context_mut();
1321        this.block_thread_for_io(
1322            socket.clone(),
1323            BlockingIoInterest::Read,
1324            None,
1325            callback!(@capture<'tcx> {
1326                address_ptr: Pointer,
1327                address_len_ptr: Pointer,
1328                is_client_sock_nonblock: bool,
1329                socket: FileDescriptionRef<Socket>,
1330                dest: MPlaceTy<'tcx>,
1331            } |this, kind: UnblockKind| {
1332                assert_eq!(kind, UnblockKind::Ready);
1333
1334                // Remove the blocking I/O interest for unblocking this thread.
1335                this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread());
1336
1337                match this.try_non_block_accept(&socket, address_ptr, address_len_ptr, is_client_sock_nonblock)? {
1338                    Ok(sockfd) => {
1339                        // We need to create the scalar using the destination size since
1340                        // `syscall(SYS_accept4, ...)` returns a long which doesn't match
1341                        // the int returned from the `accept`/`accept4` syscalls.
1342                        // See <https://man7.org/linux/man-pages/man2/syscall.2.html>.
1343                        this.write_scalar(Scalar::from_int(sockfd, dest.layout.size), &dest)
1344                    },
1345                    Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::WouldBlock => {
1346                        // We need to block the thread again as it would still block.
1347                        this.block_for_accept(socket, address_ptr, address_len_ptr, is_client_sock_nonblock, dest)
1348                    }
1349                    Err(e) => this.set_last_error_and_return(e, &dest),
1350                }
1351            }),
1352        )
1353    }
1354
1355    /// Attempt to accept an incoming connection on the listening socket in a
1356    /// non-blocking manner.
1357    ///
1358    /// **Note**: This function is only safe to call when having previously ensured
1359    /// that the socket is in [`SocketState::Listening`].
1360    fn try_non_block_accept(
1361        &mut self,
1362        socket: &FileDescriptionRef<Socket>,
1363        address_ptr: Pointer,
1364        address_len_ptr: Pointer,
1365        is_client_sock_nonblock: bool,
1366    ) -> InterpResult<'tcx, Result<i32, IoError>> {
1367        let this = self.eval_context_mut();
1368
1369        let state = socket.state.borrow();
1370        let SocketState::Listening(listener) = &*state else {
1371            panic!(
1372                "try_non_block_accept must only be called when socket is in `SocketState::Listening`"
1373            )
1374        };
1375
1376        let (stream, addr) = match listener.accept() {
1377            Ok(peer) => peer,
1378            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1379                // We know that the source is not readable so we need to update its readiness.
1380                socket.io_readiness.borrow_mut().readable = false;
1381                this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?;
1382
1383                return interp_ok(Err(IoError::HostError(e)));
1384            }
1385            Err(e) => return interp_ok(Err(IoError::HostError(e))),
1386        };
1387
1388        let family = match addr {
1389            SocketAddr::V4(_) => SocketFamily::IPv4,
1390            SocketAddr::V6(_) => SocketFamily::IPv6,
1391        };
1392
1393        if address_ptr != Pointer::null() {
1394            // We only attempt a write if the address pointer is not a null pointer.
1395            // If the address pointer is a null pointer the user isn't interested in the
1396            // address and we don't need to write anything.
1397            this.write_socket_address(&addr, address_ptr, address_len_ptr, "accept4")?;
1398        }
1399
1400        let fd = this.machine.fds.new_ref(Socket {
1401            family,
1402            state: RefCell::new(SocketState::Connected(stream)),
1403            is_non_block: Cell::new(is_client_sock_nonblock),
1404            io_readiness: RefCell::new(BlockingIoSourceReadiness::empty()),
1405            error: RefCell::new(None),
1406        });
1407        // Register the socket to the blocking I/O manager because
1408        // there is an associated host socket.
1409        this.machine.blocking_io.register(fd.clone());
1410        let sockfd = this.machine.fds.insert(fd);
1411        interp_ok(Ok(sockfd))
1412    }
1413
1414    /// Block the thread until we can send bytes into the connected socket
1415    /// or an error occurred.
1416    ///
1417    /// This recursively calls itself should the operation still block for some reason.
1418    ///
1419    /// **Note**: This function is only safe to call when having previously ensured
1420    /// that the socket is in [`SocketState::Connected`].
1421    fn block_for_send(
1422        &mut self,
1423        socket: FileDescriptionRef<Socket>,
1424        buffer_ptr: Pointer,
1425        length: usize,
1426        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
1427    ) -> InterpResult<'tcx> {
1428        let this = self.eval_context_mut();
1429        this.block_thread_for_io(
1430            socket.clone(),
1431            BlockingIoInterest::Write,
1432            None,
1433            callback!(@capture<'tcx> {
1434                socket: FileDescriptionRef<Socket>,
1435                buffer_ptr: Pointer,
1436                length: usize,
1437                finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
1438            } |this, kind: UnblockKind| {
1439                assert_eq!(kind, UnblockKind::Ready);
1440
1441                // Remove the blocking I/O interest for unblocking this thread.
1442                this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread());
1443
1444                match this.try_non_block_send(&socket, buffer_ptr, length)? {
1445                    Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::WouldBlock => {
1446                        // We need to block the thread again as it would still block.
1447                        this.block_for_send(socket, buffer_ptr, length, finish)
1448                    },
1449                    result => finish.call(this, result)
1450                }
1451            }),
1452        )
1453    }
1454
1455    /// Attempt to send bytes into the connected socket in a non-blocking manner.
1456    ///
1457    /// **Note**: This function is only safe to call when having previously ensured
1458    /// that the socket is in [`SocketState::Connected`].
1459    fn try_non_block_send(
1460        &mut self,
1461        socket: &FileDescriptionRef<Socket>,
1462        buffer_ptr: Pointer,
1463        length: usize,
1464    ) -> InterpResult<'tcx, Result<usize, IoError>> {
1465        let this = self.eval_context_mut();
1466
1467        let SocketState::Connected(stream) = &mut *socket.state.borrow_mut() else {
1468            panic!("try_non_block_send must only be called when the socket is connected")
1469        };
1470
1471        // This is a *non-blocking* write.
1472        let result = this.write_to_host(stream, length, buffer_ptr)?;
1473        match result {
1474            Err(IoError::HostError(e))
1475                if matches!(e.kind(), io::ErrorKind::NotConnected | io::ErrorKind::WouldBlock) =>
1476            {
1477                // We know that the source is not writable so we need to update it's readiness.
1478                socket.io_readiness.borrow_mut().writable = false;
1479                this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?;
1480
1481                // On Windows hosts, `send` can return WSAENOTCONN where EAGAIN or EWOULDBLOCK
1482                // would be returned on UNIX-like systems. We thus remap this error to an EWOULDBLOCK.
1483                interp_ok(Err(IoError::HostError(io::ErrorKind::WouldBlock.into())))
1484            }
1485            Ok(bytes_written) if bytes_written < length => {
1486                // We had a short write. On Unix hosts using the `epoll` and `kqueue` backends, a
1487                // short write means that the write buffer is full. We update the readiness
1488                // accordingly, which means that next time we see "writable" we will report an epoll
1489                // edge. Some applications (e.g. tokio) rely on this behavior; see
1490                // <https://github.com/tokio-rs/tokio/blob/HEAD/tokio/src/io/poll_evented.rs#L244-L264>.
1491                if cfg!(any(
1492                    // epoll
1493                    target_os = "android",
1494                    target_os = "illumos",
1495                    target_os = "linux",
1496                    target_os = "redox",
1497                    // kqueue
1498                    target_os = "dragonfly",
1499                    target_os = "freebsd",
1500                    target_os = "ios",
1501                    target_os = "macos",
1502                    target_os = "netbsd",
1503                    target_os = "openbsd",
1504                    target_os = "tvos",
1505                    target_os = "visionos",
1506                    target_os = "watchos",
1507                )) {
1508                    socket.io_readiness.borrow_mut().writable = false;
1509                    this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?;
1510                } else {
1511                    // On hosts which don't use the `epoll` or `kqueue` backends, a short write
1512                    // doesn't imply a full write buffer. However, the target we are emulating might
1513                    // guarantee this behavior. To prevent applications from being stuck on such
1514                    // targets waiting on a new readiness event, we emit a new edge which still
1515                    // contains a writable readiness. This should trick the applications into trying
1516                    // another write which would then return EWOULDBLOCK should it really be full.
1517                    // This results in an unrealistic execution but we don't have another way of
1518                    // finding out whether the write buffer is full. The "default case" of linux
1519                    // host and linux target isn't affected by this.
1520                    this.update_epoll_active_events(socket.clone(), /* force_edge */ true)?;
1521                }
1522                interp_ok(result)
1523            }
1524            result => interp_ok(result),
1525        }
1526    }
1527
1528    /// Block the thread until we can receive bytes from the connected socket
1529    /// or an error occurred.
1530    ///
1531    /// This recursively calls itself should the operation still block for some reason.
1532    ///
1533    /// **Note**: This function is only safe to call when having previously ensured
1534    /// that the socket is in [`SocketState::Connected`].
1535    fn block_for_recv(
1536        &mut self,
1537        socket: FileDescriptionRef<Socket>,
1538        buffer_ptr: Pointer,
1539        length: usize,
1540        should_peek: bool,
1541        finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
1542    ) -> InterpResult<'tcx> {
1543        let this = self.eval_context_mut();
1544        this.block_thread_for_io(
1545            socket.clone(),
1546            BlockingIoInterest::Read,
1547            None,
1548            callback!(@capture<'tcx> {
1549                socket: FileDescriptionRef<Socket>,
1550                buffer_ptr: Pointer,
1551                length: usize,
1552                should_peek: bool,
1553                finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
1554            } |this, kind: UnblockKind| {
1555                assert_eq!(kind, UnblockKind::Ready);
1556
1557                // Remove the blocking I/O interest for unblocking this thread.
1558                this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread());
1559
1560                match this.try_non_block_recv(&socket, buffer_ptr, length, should_peek)? {
1561                    Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::WouldBlock => {
1562                        // We need to block the thread again as it would still block.
1563                        this.block_for_recv(socket, buffer_ptr, length, should_peek, finish)
1564                    },
1565                    result => finish.call(this, result)
1566                }
1567            }),
1568        )
1569    }
1570
1571    /// Attempt to receive bytes from the connected socket in a non-blocking manner.
1572    ///
1573    /// **Note**: This function is only safe to call when having previously ensured
1574    /// that the socket is in [`SocketState::Connected`].
1575    fn try_non_block_recv(
1576        &mut self,
1577        socket: &FileDescriptionRef<Socket>,
1578        buffer_ptr: Pointer,
1579        length: usize,
1580        should_peek: bool,
1581    ) -> InterpResult<'tcx, Result<usize, IoError>> {
1582        let this = self.eval_context_mut();
1583
1584        let SocketState::Connected(stream) = &mut *socket.state.borrow_mut() else {
1585            panic!("try_non_block_recv must only be called when the socket is connected")
1586        };
1587
1588        // This is a *non-blocking* read/peek.
1589        let result = this.read_from_host(
1590            |buf| {
1591                if should_peek { stream.peek(buf) } else { stream.read(buf) }
1592            },
1593            length,
1594            buffer_ptr,
1595        )?;
1596        match result {
1597            Err(IoError::HostError(e))
1598                if matches!(e.kind(), io::ErrorKind::NotConnected | io::ErrorKind::WouldBlock) =>
1599            {
1600                // We know that the source is not readable so we need to update it's readiness.
1601                socket.io_readiness.borrow_mut().readable = false;
1602                this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?;
1603
1604                // On Windows hosts, `recv` can return WSAENOTCONN where EAGAIN or EWOULDBLOCK
1605                // would be returned on UNIX-like systems. We thus remap this error to an EWOULDBLOCK.
1606                interp_ok(Err(IoError::HostError(io::ErrorKind::WouldBlock.into())))
1607            }
1608            Ok(bytes_read) if bytes_read < length && bytes_read > 0 => {
1609                // We had a short read. (Note that reading 0 bytes is guaranteed to indicate EOF,
1610                // and can never happen spuriously, so we have to exclude that case.) On Unix hosts
1611                // using the `epoll` and `kqueue` backends, a short read means that the read buffer
1612                // is empty. We update the readiness accordingly, which means that next time we see
1613                // "readable" we will report an epoll edge. Some applications (e.g. tokio) rely on
1614                // this behavior; see
1615                // <https://github.com/tokio-rs/tokio/blob/HEAD/tokio/src/io/poll_evented.rs#L190-L210>
1616                if cfg!(any(
1617                    // epoll
1618                    target_os = "android",
1619                    target_os = "illumos",
1620                    target_os = "linux",
1621                    target_os = "redox",
1622                    // kqueue
1623                    target_os = "dragonfly",
1624                    target_os = "freebsd",
1625                    target_os = "ios",
1626                    target_os = "macos",
1627                    target_os = "netbsd",
1628                    target_os = "openbsd",
1629                    target_os = "tvos",
1630                    target_os = "visionos",
1631                    target_os = "watchos",
1632                )) {
1633                    socket.io_readiness.borrow_mut().readable = false;
1634                    this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?;
1635                } else {
1636                    // On hosts which don't use the `epoll` or `kqueue` backends, a short read
1637                    // doesn't imply an empty read buffer. However, the target we are emulating
1638                    // might guarantee this behavior. To prevent applications from being stuck on
1639                    // such targets waiting on a new readiness event, we emit a new edge which still
1640                    // contains a readable readiness. This should trick the applications into trying
1641                    // another read which would then return EWOULDBLOCK should it really be empty.
1642                    // This results in an unrealistic execution but we don't have another way of
1643                    // finding out whether the read buffer is empty. The "default case" of linux
1644                    // host and linux target isn't affected by this.
1645                    this.update_epoll_active_events(socket.clone(), /* force_edge */ true)?;
1646                }
1647                interp_ok(result)
1648            }
1649            result => interp_ok(result),
1650        }
1651    }
1652
1653    // Execute the provided callback function when the socket is either in
1654    // [`SocketState::Connected`] or an error occurred.
1655    /// If the socket is currently neither in the [`SocketState::Connecting`] nor
1656    /// the [`SocketState::Connecting`] state, an ENOTCONN error is returned.
1657    /// When the callback function is called with `Ok(_)`, then we're guaranteed
1658    /// that the socket is in the [`SocketState::Connected`] state.
1659    ///
1660    /// This function can optionally also block until either an error occurred or
1661    /// the socket reached the [`SocketState::Connected`] state.
1662    fn ensure_connected(
1663        &mut self,
1664        socket: FileDescriptionRef<Socket>,
1665        should_wait: bool,
1666        foreign_name: &'static str,
1667        action: DynMachineCallback<'tcx, Result<(), ()>>,
1668    ) -> InterpResult<'tcx> {
1669        let this = self.eval_context_mut();
1670
1671        let state = socket.state.borrow();
1672        match &*state {
1673            SocketState::Connecting(_) => { /* fall-through to below */ }
1674            SocketState::Connected(_) => {
1675                drop(state);
1676                return action.call(this, Ok(()));
1677            }
1678            _ => {
1679                drop(state);
1680                return action.call(this, Err(()));
1681            }
1682        };
1683
1684        drop(state);
1685
1686        // We're currently connecting. Since the underlying mio socket is non-blocking,
1687        // the only way to determine whether we are done connecting is by polling.
1688        // If we should wait until the connection is established, the timeout is `None`.
1689        // Otherwise, we use a zero duration timeout, i.e. we return immediately
1690        // (but we still go through the scheduler once -- which is fine).
1691        let timeout = if should_wait {
1692            None
1693        } else {
1694            Some((TimeoutClock::Monotonic, TimeoutAnchor::Absolute, Duration::ZERO))
1695        };
1696
1697        this.block_thread_for_io(
1698            socket.clone(),
1699            BlockingIoInterest::Write,
1700            timeout,
1701            callback!(
1702                @capture<'tcx> {
1703                    socket: FileDescriptionRef<Socket>,
1704                    should_wait: bool,
1705                    foreign_name: &'static str,
1706                    action: DynMachineCallback<'tcx, Result<(), ()>>,
1707                } |this, kind: UnblockKind| {
1708                    // Remove the blocking I/O interest for unblocking this thread.
1709                    this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread());
1710
1711                    if UnblockKind::TimedOut == kind {
1712                        // We can only time out when `should_wait` is false.
1713                        // This then means that the socket is not yet connected.
1714                        assert!(!should_wait);
1715                        return action.call(this, Err(()))
1716                    }
1717
1718                    // The thread woke up because it's ready, indicating a writeable or error event.
1719
1720                    let mut state = socket.state.borrow_mut();
1721                    let stream = match &*state {
1722                        SocketState::Connecting(stream) => stream,
1723                        SocketState::Connected(_) => {
1724                            drop(state);
1725                            // This can happen because we blocked the thread:
1726                            // maybe another thread "upgraded" the connection in the meantime.
1727                            return action.call(this, Ok(()))
1728                        },
1729                        _ => {
1730                            drop(state);
1731                            // We ensured that we only block when we're currently connecting.
1732                            // Since this thread just got rescheduled, it could be that another
1733                            // thread realized that the connection failed and we're thus in
1734                            // an "invalid state".
1735                            return action.call(this, Err(()))
1736                        }
1737                    };
1738
1739                    // Manually check whether there were any errors since calling `connect`.
1740                    if let Ok(Some(err)) = stream.take_error() {
1741                        // There was an error during connecting and thus we
1742                        // return ENOTCONN. It's the program's responsibility
1743                        // to read SO_ERROR itself.
1744
1745                        // Store the error such that we can return it when
1746                        // `getsockopt(SOL_SOCKET, SO_ERROR, ...)` is called on the socket.
1747                        socket.error.replace(Some(err));
1748
1749                        // Go back to initial state since the only way of getting into the
1750                        // `Connecting` state is from the `Initial` state and at this point
1751                        // we know that the connection won't be established anymore.
1752                        *state = SocketState::Initial;
1753                        drop(state);
1754                        return action.call(this, Err(()))
1755                    }
1756
1757                    // There was no error during connecting. Mio advises also reading the peer address
1758                    // to ensure that socket is actually connected and that it wasn't a spurious wake-up:
1759                    // <https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#notes>
1760                    //
1761                    // Attempting to read the peer address would introduce an edge-case where the
1762                    // write end of the socket could already be shutdown before it received a
1763                    // writable event. When we then call [`TcpStream::peer_addr`] we receive an
1764                    // error. This would need extra state for storing whether the write end was
1765                    // manually closed using `shutdown`.
1766                    // Also, tokio doesn't read the peer address and everything seems to be fine,
1767                    // so we don't do that either:
1768                    // <https://github.com/tokio-rs/mio/issues/1942#issuecomment-4162607761>
1769                    // In other words, we are assuming that there will be no spurious
1770                    // wakeups while establishing the connection.
1771
1772                    // The connection is established.
1773
1774                    // Temporarily use dummy state to take ownership of the stream.
1775                    let SocketState::Connecting(stream) = std::mem::replace(&mut*state, SocketState::Initial) else {
1776                        // At the start of the function we ensured that we're currently connecting.
1777                        unreachable!()
1778                    };
1779                    *state = SocketState::Connected(stream);
1780                    drop(state);
1781                    action.call(this, Ok(()))
1782                }
1783            ),
1784        )
1785    }
1786}
1787
1788impl VisitProvenance for FileDescriptionRef<Socket> {
1789    // A socket doesn't contain any references to machine memory
1790    // and thus we don't need to propagate the visit.
1791    fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
1792}
1793
1794impl SourceFileDescription for Socket {
1795    fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()> {
1796        let mut state = self.state.borrow_mut();
1797        match &mut *state {
1798            SocketState::Listening(listener) => f(listener),
1799            SocketState::Connecting(stream) | SocketState::Connected(stream) => f(stream),
1800            // We never try adding a socket which is not backed by a real socket to the poll registry.
1801            _ => unreachable!(),
1802        }
1803    }
1804
1805    fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness> {
1806        self.io_readiness.borrow_mut()
1807    }
1808}