1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use rustc_target::spec::Os;
11
12use crate::concurrency::VClock;
13use crate::shims::files::{
14 EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
15};
16use crate::shims::unix::UnixFileDescription;
17use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
18use crate::*;
19
20const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
24
25#[derive(Debug, PartialEq)]
26enum AnonSocketType {
27 Socketpair,
29 PipeRead,
31 PipeWrite,
33}
34
35#[derive(Debug)]
37struct AnonSocket {
38 readbuf: Option<RefCell<Buffer>>,
41 peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
45 peer_lost_data: Cell<bool>,
49 blocked_read_tid: RefCell<Vec<ThreadId>>,
52 blocked_write_tid: RefCell<Vec<ThreadId>>,
55 is_nonblock: Cell<bool>,
57 fd_type: AnonSocketType,
59}
60
61#[derive(Debug)]
62struct Buffer {
63 buf: VecDeque<u8>,
64 clock: VClock,
65}
66
67impl Buffer {
68 fn new() -> Self {
69 Buffer { buf: VecDeque::new(), clock: VClock::default() }
70 }
71}
72
73impl AnonSocket {
74 fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
75 self.peer_fd.get().unwrap()
76 }
77}
78
79impl FileDescription for AnonSocket {
80 fn name(&self) -> &'static str {
81 match self.fd_type {
82 AnonSocketType::Socketpair => "socketpair",
83 AnonSocketType::PipeRead | AnonSocketType::PipeWrite => "pipe",
84 }
85 }
86
87 fn destroy<'tcx>(
88 self,
89 _self_id: FdId,
90 _communicate_allowed: bool,
91 ecx: &mut MiriInterpCx<'tcx>,
92 ) -> InterpResult<'tcx, io::Result<()>> {
93 if let Some(peer_fd) = self.peer_fd().upgrade() {
94 if let Some(readbuf) = &self.readbuf {
97 if !readbuf.borrow().buf.is_empty() {
98 peer_fd.peer_lost_data.set(true);
99 }
100 }
101 ecx.update_epoll_active_events(peer_fd, false)?;
103 }
104 interp_ok(Ok(()))
105 }
106
107 fn read<'tcx>(
108 self: FileDescriptionRef<Self>,
109 _communicate_allowed: bool,
110 ptr: Pointer,
111 len: usize,
112 ecx: &mut MiriInterpCx<'tcx>,
113 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
114 ) -> InterpResult<'tcx> {
115 anonsocket_read(self, ptr, len, ecx, finish)
116 }
117
118 fn write<'tcx>(
119 self: FileDescriptionRef<Self>,
120 _communicate_allowed: bool,
121 ptr: Pointer,
122 len: usize,
123 ecx: &mut MiriInterpCx<'tcx>,
124 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
125 ) -> InterpResult<'tcx> {
126 anonsocket_write(self, ptr, len, ecx, finish)
127 }
128
129 fn short_fd_operations(&self) -> bool {
130 matches!(self.fd_type, AnonSocketType::Socketpair)
135 }
136
137 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
138 self
139 }
140
141 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
142 let mut flags = 0;
143
144 match self.fd_type {
149 AnonSocketType::Socketpair => {
150 flags |= ecx.eval_libc_i32("O_RDWR");
151 }
152 AnonSocketType::PipeRead => {
153 flags |= ecx.eval_libc_i32("O_RDONLY");
154 }
155 AnonSocketType::PipeWrite => {
156 flags |= ecx.eval_libc_i32("O_WRONLY");
157 }
158 }
159
160 if self.is_nonblock.get() {
162 flags |= ecx.eval_libc_i32("O_NONBLOCK");
163 }
164
165 interp_ok(Scalar::from_i32(flags))
166 }
167
168 fn set_flags<'tcx>(
169 &self,
170 mut flag: i32,
171 ecx: &mut MiriInterpCx<'tcx>,
172 ) -> InterpResult<'tcx, Scalar> {
173 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
174
175 if flag & o_nonblock == o_nonblock {
177 self.is_nonblock.set(true);
178 flag &= !o_nonblock;
179 } else {
180 self.is_nonblock.set(false);
181 }
182
183 if flag != 0 {
185 throw_unsup_format!(
186 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
187 )
188 }
189
190 interp_ok(Scalar::from_i32(0))
191 }
192}
193
194fn anonsocket_write<'tcx>(
196 self_ref: FileDescriptionRef<AnonSocket>,
197 ptr: Pointer,
198 len: usize,
199 ecx: &mut MiriInterpCx<'tcx>,
200 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
201) -> InterpResult<'tcx> {
202 if len == 0 {
205 return finish.call(ecx, Ok(0));
206 }
207
208 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
210 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
213 };
214
215 let Some(writebuf) = &peer_fd.readbuf else {
216 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
218 };
219
220 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
222 if available_space == 0 {
223 if self_ref.is_nonblock.get() {
224 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
226 } else {
227 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
228 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
231 ecx.block_thread(
232 BlockReason::UnnamedSocket,
233 None,
234 callback!(
235 @capture<'tcx> {
236 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
237 ptr: Pointer,
238 len: usize,
239 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
240 }
241 |this, unblock: UnblockKind| {
242 assert_eq!(unblock, UnblockKind::Ready);
243 let self_ref = weak_self_ref.upgrade().unwrap();
246 anonsocket_write(self_ref, ptr, len, this, finish)
247 }
248 ),
249 );
250 }
251 } else {
252 let mut writebuf = writebuf.borrow_mut();
254 ecx.release_clock(|clock| {
256 writebuf.clock.join(clock);
257 })?;
258 let write_size = len.min(available_space);
260 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
261 assert_eq!(actual_write_size, write_size);
262
263 drop(writebuf);
265
266 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
268 for thread_id in waiting_threads {
270 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
271 }
272 ecx.update_epoll_active_events(self_ref, false)?;
276 ecx.update_epoll_active_events(peer_fd, true)?;
277
278 return finish.call(ecx, Ok(write_size));
279 }
280 interp_ok(())
281}
282
283fn anonsocket_read<'tcx>(
285 self_ref: FileDescriptionRef<AnonSocket>,
286 ptr: Pointer,
287 len: usize,
288 ecx: &mut MiriInterpCx<'tcx>,
289 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
290) -> InterpResult<'tcx> {
291 if len == 0 {
293 return finish.call(ecx, Ok(0));
294 }
295
296 let Some(readbuf) = &self_ref.readbuf else {
297 throw_unsup_format!("reading from the write end of a pipe")
300 };
301
302 if readbuf.borrow_mut().buf.is_empty() {
303 if self_ref.peer_fd().upgrade().is_none() {
304 return finish.call(ecx, Ok(0));
307 } else if self_ref.is_nonblock.get() {
308 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
314 } else {
315 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
316 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
319 ecx.block_thread(
320 BlockReason::UnnamedSocket,
321 None,
322 callback!(
323 @capture<'tcx> {
324 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
325 ptr: Pointer,
326 len: usize,
327 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
328 }
329 |this, unblock: UnblockKind| {
330 assert_eq!(unblock, UnblockKind::Ready);
331 let self_ref = weak_self_ref.upgrade().unwrap();
334 anonsocket_read(self_ref, ptr, len, this, finish)
335 }
336 ),
337 );
338 }
339 } else {
340 let mut readbuf = readbuf.borrow_mut();
342 ecx.acquire_clock(&readbuf.clock)?;
346
347 let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
350 let readbuf_now_empty = readbuf.buf.is_empty();
351
352 drop(readbuf);
354
355 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
363 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
365 for thread_id in waiting_threads {
367 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
368 }
369 ecx.update_epoll_active_events(peer_fd, readbuf_now_empty)?;
374 };
375 ecx.update_epoll_active_events(self_ref, false)?;
377
378 return finish.call(ecx, Ok(read_size));
379 }
380 interp_ok(())
381}
382
383impl UnixFileDescription for AnonSocket {
384 fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
385 let mut epoll_ready_events = EpollEvents::new();
389
390 if let Some(readbuf) = &self.readbuf {
392 if !readbuf.borrow().buf.is_empty() {
393 epoll_ready_events.epollin = true;
394 }
395 } else {
396 epoll_ready_events.epollin = true;
398 }
399
400 if let Some(peer_fd) = self.peer_fd().upgrade() {
402 if let Some(writebuf) = &peer_fd.readbuf {
403 let data_size = writebuf.borrow().buf.len();
404 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
405 if available_space != 0 {
406 epoll_ready_events.epollout = true;
407 }
408 } else {
409 epoll_ready_events.epollout = true;
411 }
412 } else {
413 epoll_ready_events.epollrdhup = true;
416 epoll_ready_events.epollhup = true;
417 epoll_ready_events.epollin = true;
421 epoll_ready_events.epollout = true;
422 if self.peer_lost_data.get() {
424 epoll_ready_events.epollerr = true;
425 }
426 }
427 interp_ok(epoll_ready_events)
428 }
429}
430
431impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
432pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
433 fn socketpair(
436 &mut self,
437 domain: &OpTy<'tcx>,
438 type_: &OpTy<'tcx>,
439 protocol: &OpTy<'tcx>,
440 sv: &OpTy<'tcx>,
441 ) -> InterpResult<'tcx, Scalar> {
442 let this = self.eval_context_mut();
443
444 let domain = this.read_scalar(domain)?.to_i32()?;
445 let mut flags = this.read_scalar(type_)?.to_i32()?;
446 let protocol = this.read_scalar(protocol)?.to_i32()?;
447 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
449
450 let mut is_sock_nonblock = false;
451
452 if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android) {
455 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
457 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
458 if flags & sock_nonblock == sock_nonblock {
459 is_sock_nonblock = true;
460 flags &= !sock_nonblock;
461 }
462 if flags & sock_cloexec == sock_cloexec {
463 flags &= !sock_cloexec;
464 }
465 }
466
467 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
471 throw_unsup_format!(
472 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
473 and AF_LOCAL are allowed",
474 domain
475 );
476 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
477 throw_unsup_format!(
478 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
479 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
480 flags
481 );
482 } else if protocol != 0 {
483 throw_unsup_format!(
484 "socketpair: socket protocol {protocol} is unsupported, \
485 only 0 is allowed",
486 );
487 }
488
489 let fds = &mut this.machine.fds;
491 let fd0 = fds.new_ref(AnonSocket {
492 readbuf: Some(RefCell::new(Buffer::new())),
493 peer_fd: OnceCell::new(),
494 peer_lost_data: Cell::new(false),
495 blocked_read_tid: RefCell::new(Vec::new()),
496 blocked_write_tid: RefCell::new(Vec::new()),
497 is_nonblock: Cell::new(is_sock_nonblock),
498 fd_type: AnonSocketType::Socketpair,
499 });
500 let fd1 = fds.new_ref(AnonSocket {
501 readbuf: Some(RefCell::new(Buffer::new())),
502 peer_fd: OnceCell::new(),
503 peer_lost_data: Cell::new(false),
504 blocked_read_tid: RefCell::new(Vec::new()),
505 blocked_write_tid: RefCell::new(Vec::new()),
506 is_nonblock: Cell::new(is_sock_nonblock),
507 fd_type: AnonSocketType::Socketpair,
508 });
509
510 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
512 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
513
514 let sv0 = fds.insert(fd0);
516 let sv1 = fds.insert(fd1);
517
518 let sv0 = Scalar::from_int(sv0, sv.layout.size);
520 let sv1 = Scalar::from_int(sv1, sv.layout.size);
521 this.write_scalar(sv0, &sv)?;
522 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
523
524 interp_ok(Scalar::from_i32(0))
525 }
526
527 fn pipe2(
528 &mut self,
529 pipefd: &OpTy<'tcx>,
530 flags: Option<&OpTy<'tcx>>,
531 ) -> InterpResult<'tcx, Scalar> {
532 let this = self.eval_context_mut();
533
534 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
535 let mut flags = match flags {
536 Some(flags) => this.read_scalar(flags)?.to_i32()?,
537 None => 0,
538 };
539
540 let cloexec = this.eval_libc_i32("O_CLOEXEC");
541 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
542
543 let mut is_nonblock = false;
546 if flags & o_nonblock == o_nonblock {
547 is_nonblock = true;
548 flags &= !o_nonblock;
549 }
550 if flags & cloexec == cloexec {
552 flags &= !cloexec;
553 }
554 if flags != 0 {
555 throw_unsup_format!("unsupported flags in `pipe2`");
556 }
557
558 let fds = &mut this.machine.fds;
561 let fd0 = fds.new_ref(AnonSocket {
562 readbuf: Some(RefCell::new(Buffer::new())),
563 peer_fd: OnceCell::new(),
564 peer_lost_data: Cell::new(false),
565 blocked_read_tid: RefCell::new(Vec::new()),
566 blocked_write_tid: RefCell::new(Vec::new()),
567 is_nonblock: Cell::new(is_nonblock),
568 fd_type: AnonSocketType::PipeRead,
569 });
570 let fd1 = fds.new_ref(AnonSocket {
571 readbuf: None,
572 peer_fd: OnceCell::new(),
573 peer_lost_data: Cell::new(false),
574 blocked_read_tid: RefCell::new(Vec::new()),
575 blocked_write_tid: RefCell::new(Vec::new()),
576 is_nonblock: Cell::new(is_nonblock),
577 fd_type: AnonSocketType::PipeWrite,
578 });
579
580 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
582 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
583
584 let pipefd0 = fds.insert(fd0);
586 let pipefd1 = fds.insert(fd1);
587
588 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
590 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
591 this.write_scalar(pipefd0, &pipefd)?;
592 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
593
594 interp_ok(Scalar::from_i32(0))
595 }
596}