1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io::{self, ErrorKind, Read};
8
9use rustc_target::spec::Os;
10
11use crate::concurrency::VClock;
12use crate::shims::files::{
13 EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
14};
15use crate::shims::unix::UnixFileDescription;
16use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
17use crate::*;
18
19const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
23
24#[derive(Debug, PartialEq)]
25enum AnonSocketType {
26 Socketpair,
28 PipeRead,
30 PipeWrite,
32}
33
34#[derive(Debug)]
36struct AnonSocket {
37 readbuf: Option<RefCell<Buffer>>,
40 peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
44 peer_lost_data: Cell<bool>,
48 blocked_read_tid: RefCell<Vec<ThreadId>>,
51 blocked_write_tid: RefCell<Vec<ThreadId>>,
54 is_nonblock: Cell<bool>,
56 fd_type: AnonSocketType,
58}
59
60#[derive(Debug)]
61struct Buffer {
62 buf: VecDeque<u8>,
63 clock: VClock,
64}
65
66impl Buffer {
67 fn new() -> Self {
68 Buffer { buf: VecDeque::new(), clock: VClock::default() }
69 }
70}
71
72impl AnonSocket {
73 fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
74 self.peer_fd.get().unwrap()
75 }
76}
77
78impl FileDescription for AnonSocket {
79 fn name(&self) -> &'static str {
80 match self.fd_type {
81 AnonSocketType::Socketpair => "socketpair",
82 AnonSocketType::PipeRead | AnonSocketType::PipeWrite => "pipe",
83 }
84 }
85
86 fn destroy<'tcx>(
87 self,
88 _self_id: FdId,
89 _communicate_allowed: bool,
90 ecx: &mut MiriInterpCx<'tcx>,
91 ) -> InterpResult<'tcx, io::Result<()>> {
92 if let Some(peer_fd) = self.peer_fd().upgrade() {
93 if let Some(readbuf) = &self.readbuf {
96 if !readbuf.borrow().buf.is_empty() {
97 peer_fd.peer_lost_data.set(true);
98 }
99 }
100 ecx.update_epoll_active_events(peer_fd, false)?;
102 }
103 interp_ok(Ok(()))
104 }
105
106 fn read<'tcx>(
107 self: FileDescriptionRef<Self>,
108 _communicate_allowed: bool,
109 ptr: Pointer,
110 len: usize,
111 ecx: &mut MiriInterpCx<'tcx>,
112 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
113 ) -> InterpResult<'tcx> {
114 anonsocket_read(self, ptr, len, ecx, finish)
115 }
116
117 fn write<'tcx>(
118 self: FileDescriptionRef<Self>,
119 _communicate_allowed: bool,
120 ptr: Pointer,
121 len: usize,
122 ecx: &mut MiriInterpCx<'tcx>,
123 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
124 ) -> InterpResult<'tcx> {
125 anonsocket_write(self, ptr, len, ecx, finish)
126 }
127
128 fn short_fd_operations(&self) -> bool {
129 matches!(self.fd_type, AnonSocketType::Socketpair)
134 }
135
136 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
137 self
138 }
139
140 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
141 let mut flags = 0;
142
143 match self.fd_type {
148 AnonSocketType::Socketpair => {
149 flags |= ecx.eval_libc_i32("O_RDWR");
150 }
151 AnonSocketType::PipeRead => {
152 flags |= ecx.eval_libc_i32("O_RDONLY");
153 }
154 AnonSocketType::PipeWrite => {
155 flags |= ecx.eval_libc_i32("O_WRONLY");
156 }
157 }
158
159 if self.is_nonblock.get() {
161 flags |= ecx.eval_libc_i32("O_NONBLOCK");
162 }
163
164 interp_ok(Scalar::from_i32(flags))
165 }
166
167 fn set_flags<'tcx>(
168 &self,
169 mut flag: i32,
170 ecx: &mut MiriInterpCx<'tcx>,
171 ) -> InterpResult<'tcx, Scalar> {
172 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
173
174 if flag & o_nonblock == o_nonblock {
176 self.is_nonblock.set(true);
177 flag &= !o_nonblock;
178 } else {
179 self.is_nonblock.set(false);
180 }
181
182 if flag != 0 {
184 throw_unsup_format!(
185 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
186 )
187 }
188
189 interp_ok(Scalar::from_i32(0))
190 }
191}
192
193fn anonsocket_write<'tcx>(
195 self_ref: FileDescriptionRef<AnonSocket>,
196 ptr: Pointer,
197 len: usize,
198 ecx: &mut MiriInterpCx<'tcx>,
199 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
200) -> InterpResult<'tcx> {
201 if len == 0 {
204 return finish.call(ecx, Ok(0));
205 }
206
207 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
209 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
212 };
213
214 let Some(writebuf) = &peer_fd.readbuf else {
215 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
217 };
218
219 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
221 if available_space == 0 {
222 if self_ref.is_nonblock.get() {
223 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
225 } else {
226 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
227 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
230 ecx.block_thread(
231 BlockReason::UnnamedSocket,
232 None,
233 callback!(
234 @capture<'tcx> {
235 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
236 ptr: Pointer,
237 len: usize,
238 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
239 }
240 |this, unblock: UnblockKind| {
241 assert_eq!(unblock, UnblockKind::Ready);
242 let self_ref = weak_self_ref.upgrade().unwrap();
245 anonsocket_write(self_ref, ptr, len, this, finish)
246 }
247 ),
248 );
249 }
250 } else {
251 let mut writebuf = writebuf.borrow_mut();
253 ecx.release_clock(|clock| {
255 writebuf.clock.join(clock);
256 })?;
257 let write_size = len.min(available_space);
259 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
260 assert_eq!(actual_write_size, write_size);
261
262 drop(writebuf);
264
265 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
267 for thread_id in waiting_threads {
269 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
270 }
271 ecx.update_epoll_active_events(self_ref, false)?;
275 ecx.update_epoll_active_events(peer_fd, true)?;
276
277 return finish.call(ecx, Ok(write_size));
278 }
279 interp_ok(())
280}
281
282fn anonsocket_read<'tcx>(
284 self_ref: FileDescriptionRef<AnonSocket>,
285 ptr: Pointer,
286 len: usize,
287 ecx: &mut MiriInterpCx<'tcx>,
288 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
289) -> InterpResult<'tcx> {
290 if len == 0 {
292 return finish.call(ecx, Ok(0));
293 }
294
295 let Some(readbuf) = &self_ref.readbuf else {
296 throw_unsup_format!("reading from the write end of a pipe")
299 };
300
301 if readbuf.borrow_mut().buf.is_empty() {
302 if self_ref.peer_fd().upgrade().is_none() {
303 return finish.call(ecx, Ok(0));
306 } else if self_ref.is_nonblock.get() {
307 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
313 } else {
314 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
315 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
318 ecx.block_thread(
319 BlockReason::UnnamedSocket,
320 None,
321 callback!(
322 @capture<'tcx> {
323 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
324 ptr: Pointer,
325 len: usize,
326 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
327 }
328 |this, unblock: UnblockKind| {
329 assert_eq!(unblock, UnblockKind::Ready);
330 let self_ref = weak_self_ref.upgrade().unwrap();
333 anonsocket_read(self_ref, ptr, len, this, finish)
334 }
335 ),
336 );
337 }
338 } else {
339 let mut readbuf = readbuf.borrow_mut();
341 ecx.acquire_clock(&readbuf.clock)?;
345
346 let read_size = ecx.read_from_host(|buf| readbuf.buf.read(buf), len, ptr)?.unwrap();
349 let readbuf_now_empty = readbuf.buf.is_empty();
350
351 drop(readbuf);
353
354 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
362 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
364 for thread_id in waiting_threads {
366 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
367 }
368 ecx.update_epoll_active_events(peer_fd, readbuf_now_empty)?;
373 };
374 ecx.update_epoll_active_events(self_ref, false)?;
376
377 return finish.call(ecx, Ok(read_size));
378 }
379 interp_ok(())
380}
381
382impl UnixFileDescription for AnonSocket {
383 fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
384 let mut epoll_ready_events = EpollEvents::new();
388
389 if let Some(readbuf) = &self.readbuf {
391 if !readbuf.borrow().buf.is_empty() {
392 epoll_ready_events.epollin = true;
393 }
394 } else {
395 epoll_ready_events.epollin = true;
397 }
398
399 if let Some(peer_fd) = self.peer_fd().upgrade() {
401 if let Some(writebuf) = &peer_fd.readbuf {
402 let data_size = writebuf.borrow().buf.len();
403 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
404 if available_space != 0 {
405 epoll_ready_events.epollout = true;
406 }
407 } else {
408 epoll_ready_events.epollout = true;
410 }
411 } else {
412 epoll_ready_events.epollrdhup = true;
415 epoll_ready_events.epollhup = true;
416 epoll_ready_events.epollin = true;
420 epoll_ready_events.epollout = true;
421 if self.peer_lost_data.get() {
423 epoll_ready_events.epollerr = true;
424 }
425 }
426 interp_ok(epoll_ready_events)
427 }
428}
429
430impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
431pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
432 fn socketpair(
435 &mut self,
436 domain: &OpTy<'tcx>,
437 type_: &OpTy<'tcx>,
438 protocol: &OpTy<'tcx>,
439 sv: &OpTy<'tcx>,
440 ) -> InterpResult<'tcx, Scalar> {
441 let this = self.eval_context_mut();
442
443 let domain = this.read_scalar(domain)?.to_i32()?;
444 let mut flags = this.read_scalar(type_)?.to_i32()?;
445 let protocol = this.read_scalar(protocol)?.to_i32()?;
446 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
448
449 let mut is_sock_nonblock = false;
450
451 if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android) {
454 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
456 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
457 if flags & sock_nonblock == sock_nonblock {
458 is_sock_nonblock = true;
459 flags &= !sock_nonblock;
460 }
461 if flags & sock_cloexec == sock_cloexec {
462 flags &= !sock_cloexec;
463 }
464 }
465
466 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
470 throw_unsup_format!(
471 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
472 and AF_LOCAL are allowed",
473 domain
474 );
475 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
476 throw_unsup_format!(
477 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
478 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
479 flags
480 );
481 } else if protocol != 0 {
482 throw_unsup_format!(
483 "socketpair: socket protocol {protocol} is unsupported, \
484 only 0 is allowed",
485 );
486 }
487
488 let fds = &mut this.machine.fds;
490 let fd0 = fds.new_ref(AnonSocket {
491 readbuf: Some(RefCell::new(Buffer::new())),
492 peer_fd: OnceCell::new(),
493 peer_lost_data: Cell::new(false),
494 blocked_read_tid: RefCell::new(Vec::new()),
495 blocked_write_tid: RefCell::new(Vec::new()),
496 is_nonblock: Cell::new(is_sock_nonblock),
497 fd_type: AnonSocketType::Socketpair,
498 });
499 let fd1 = fds.new_ref(AnonSocket {
500 readbuf: Some(RefCell::new(Buffer::new())),
501 peer_fd: OnceCell::new(),
502 peer_lost_data: Cell::new(false),
503 blocked_read_tid: RefCell::new(Vec::new()),
504 blocked_write_tid: RefCell::new(Vec::new()),
505 is_nonblock: Cell::new(is_sock_nonblock),
506 fd_type: AnonSocketType::Socketpair,
507 });
508
509 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
511 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
512
513 let sv0 = fds.insert(fd0);
515 let sv1 = fds.insert(fd1);
516
517 let sv0 = Scalar::from_int(sv0, sv.layout.size);
519 let sv1 = Scalar::from_int(sv1, sv.layout.size);
520 this.write_scalar(sv0, &sv)?;
521 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
522
523 interp_ok(Scalar::from_i32(0))
524 }
525
526 fn pipe2(
527 &mut self,
528 pipefd: &OpTy<'tcx>,
529 flags: Option<&OpTy<'tcx>>,
530 ) -> InterpResult<'tcx, Scalar> {
531 let this = self.eval_context_mut();
532
533 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
534 let mut flags = match flags {
535 Some(flags) => this.read_scalar(flags)?.to_i32()?,
536 None => 0,
537 };
538
539 let cloexec = this.eval_libc_i32("O_CLOEXEC");
540 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
541
542 let mut is_nonblock = false;
545 if flags & o_nonblock == o_nonblock {
546 is_nonblock = true;
547 flags &= !o_nonblock;
548 }
549 if flags & cloexec == cloexec {
551 flags &= !cloexec;
552 }
553 if flags != 0 {
554 throw_unsup_format!("unsupported flags in `pipe2`");
555 }
556
557 let fds = &mut this.machine.fds;
560 let fd0 = fds.new_ref(AnonSocket {
561 readbuf: Some(RefCell::new(Buffer::new())),
562 peer_fd: OnceCell::new(),
563 peer_lost_data: Cell::new(false),
564 blocked_read_tid: RefCell::new(Vec::new()),
565 blocked_write_tid: RefCell::new(Vec::new()),
566 is_nonblock: Cell::new(is_nonblock),
567 fd_type: AnonSocketType::PipeRead,
568 });
569 let fd1 = fds.new_ref(AnonSocket {
570 readbuf: None,
571 peer_fd: OnceCell::new(),
572 peer_lost_data: Cell::new(false),
573 blocked_read_tid: RefCell::new(Vec::new()),
574 blocked_write_tid: RefCell::new(Vec::new()),
575 is_nonblock: Cell::new(is_nonblock),
576 fd_type: AnonSocketType::PipeWrite,
577 });
578
579 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
581 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
582
583 let pipefd0 = fds.insert(fd0);
585 let pipefd1 = fds.insert(fd1);
586
587 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
589 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
590 this.write_scalar(pipefd0, &pipefd)?;
591 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
592
593 interp_ok(Scalar::from_i32(0))
594 }
595}