1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io::{self, ErrorKind, Read};
8
9use rustc_target::spec::Os;
10
11use crate::concurrency::VClock;
12use crate::shims::files::{
13 EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
14};
15use crate::shims::unix::UnixFileDescription;
16use crate::shims::unix::linux_like::epoll::{EpollReadiness, EvalContextExt as _};
17use crate::*;
18
19const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
23
24#[derive(Debug, PartialEq)]
25enum VirtualSocketType {
26 Socketpair,
28 PipeRead,
30 PipeWrite,
32}
33
34#[derive(Debug)]
36struct VirtualSocket {
37 readbuf: Option<RefCell<Buffer>>,
40 peer_fd: OnceCell<WeakFileDescriptionRef<VirtualSocket>>,
44 peer_lost_data: Cell<bool>,
48 blocked_read_tid: RefCell<Vec<ThreadId>>,
51 blocked_write_tid: RefCell<Vec<ThreadId>>,
54 is_nonblock: Cell<bool>,
56 fd_type: VirtualSocketType,
58}
59
60#[derive(Debug)]
61struct Buffer {
62 buf: VecDeque<u8>,
63 clock: VClock,
64}
65
66impl Buffer {
67 fn new() -> Self {
68 Buffer { buf: VecDeque::new(), clock: VClock::default() }
69 }
70}
71
72impl VirtualSocket {
73 fn peer_fd(&self) -> &WeakFileDescriptionRef<VirtualSocket> {
74 self.peer_fd.get().unwrap()
75 }
76}
77
78impl FileDescription for VirtualSocket {
79 fn name(&self) -> &'static str {
80 match self.fd_type {
81 VirtualSocketType::Socketpair => "socketpair",
82 VirtualSocketType::PipeRead | VirtualSocketType::PipeWrite => "pipe",
83 }
84 }
85
86 fn metadata<'tcx>(
87 &self,
88 ) -> InterpResult<'tcx, Either<io::Result<std::fs::Metadata>, &'static str>> {
89 let mode_name = match self.fd_type {
90 VirtualSocketType::Socketpair => "S_IFSOCK",
91 VirtualSocketType::PipeRead | VirtualSocketType::PipeWrite => "S_IFIFO",
92 };
93 interp_ok(Either::Right(mode_name))
94 }
95
96 fn destroy<'tcx>(
97 self,
98 _self_id: FdId,
99 _communicate_allowed: bool,
100 ecx: &mut MiriInterpCx<'tcx>,
101 ) -> InterpResult<'tcx, io::Result<()>> {
102 if let Some(peer_fd) = self.peer_fd().upgrade() {
103 if let Some(readbuf) = &self.readbuf {
106 if !readbuf.borrow().buf.is_empty() {
107 peer_fd.peer_lost_data.set(true);
108 }
109 }
110 ecx.update_epoll_active_events(peer_fd, false)?;
112 }
113 interp_ok(Ok(()))
114 }
115
116 fn read<'tcx>(
117 self: FileDescriptionRef<Self>,
118 _communicate_allowed: bool,
119 ptr: Pointer,
120 len: usize,
121 ecx: &mut MiriInterpCx<'tcx>,
122 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
123 ) -> InterpResult<'tcx> {
124 virtual_socket_read(self, ptr, len, ecx, finish)
125 }
126
127 fn write<'tcx>(
128 self: FileDescriptionRef<Self>,
129 _communicate_allowed: bool,
130 ptr: Pointer,
131 len: usize,
132 ecx: &mut MiriInterpCx<'tcx>,
133 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
134 ) -> InterpResult<'tcx> {
135 virtual_socket_write(self, ptr, len, ecx, finish)
136 }
137
138 fn short_fd_operations(&self) -> bool {
139 false
144 }
145
146 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
147 self
148 }
149
150 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
151 let mut flags = 0;
152
153 match self.fd_type {
158 VirtualSocketType::Socketpair => {
159 flags |= ecx.eval_libc_i32("O_RDWR");
160 }
161 VirtualSocketType::PipeRead => {
162 flags |= ecx.eval_libc_i32("O_RDONLY");
163 }
164 VirtualSocketType::PipeWrite => {
165 flags |= ecx.eval_libc_i32("O_WRONLY");
166 }
167 }
168
169 if self.is_nonblock.get() {
171 flags |= ecx.eval_libc_i32("O_NONBLOCK");
172 }
173
174 interp_ok(Scalar::from_i32(flags))
175 }
176
177 fn set_flags<'tcx>(
178 &self,
179 mut flag: i32,
180 ecx: &mut MiriInterpCx<'tcx>,
181 ) -> InterpResult<'tcx, Scalar> {
182 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
183
184 if flag & o_nonblock == o_nonblock {
186 self.is_nonblock.set(true);
187 flag &= !o_nonblock;
188 } else {
189 self.is_nonblock.set(false);
190 }
191
192 if flag != 0 {
194 throw_unsup_format!(
195 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
196 )
197 }
198
199 interp_ok(Scalar::from_i32(0))
200 }
201}
202
203fn virtual_socket_write<'tcx>(
205 self_ref: FileDescriptionRef<VirtualSocket>,
206 ptr: Pointer,
207 len: usize,
208 ecx: &mut MiriInterpCx<'tcx>,
209 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
210) -> InterpResult<'tcx> {
211 if len == 0 {
214 return finish.call(ecx, Ok(0));
215 }
216
217 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
219 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
222 };
223
224 let Some(writebuf) = &peer_fd.readbuf else {
225 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
227 };
228
229 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
231 if available_space == 0 {
232 if self_ref.is_nonblock.get() {
233 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
235 } else {
236 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
237 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
240 ecx.block_thread(
241 BlockReason::VirtualSocket,
242 None,
243 callback!(
244 @capture<'tcx> {
245 weak_self_ref: WeakFileDescriptionRef<VirtualSocket>,
246 ptr: Pointer,
247 len: usize,
248 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
249 }
250 |this, unblock: UnblockKind| {
251 assert_eq!(unblock, UnblockKind::Ready);
252 let self_ref = weak_self_ref.upgrade().unwrap();
255 virtual_socket_write(self_ref, ptr, len, this, finish)
256 }
257 ),
258 );
259 }
260 } else {
261 let mut writebuf = writebuf.borrow_mut();
263 ecx.release_clock(|clock| {
265 writebuf.clock.join(clock);
266 })?;
267 let write_size = len.min(available_space);
269 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
270 assert_eq!(actual_write_size, write_size);
271
272 drop(writebuf);
274
275 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
277 for thread_id in waiting_threads {
279 ecx.unblock_thread(thread_id, BlockReason::VirtualSocket)?;
280 }
281 ecx.update_epoll_active_events(self_ref, false)?;
285 ecx.update_epoll_active_events(peer_fd, true)?;
286
287 return finish.call(ecx, Ok(write_size));
288 }
289 interp_ok(())
290}
291
292fn virtual_socket_read<'tcx>(
294 self_ref: FileDescriptionRef<VirtualSocket>,
295 ptr: Pointer,
296 len: usize,
297 ecx: &mut MiriInterpCx<'tcx>,
298 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
299) -> InterpResult<'tcx> {
300 if len == 0 {
302 return finish.call(ecx, Ok(0));
303 }
304
305 let Some(readbuf) = &self_ref.readbuf else {
306 throw_unsup_format!("reading from the write end of a pipe")
309 };
310
311 if readbuf.borrow_mut().buf.is_empty() {
312 if self_ref.peer_fd().upgrade().is_none() {
313 return finish.call(ecx, Ok(0));
316 } else if self_ref.is_nonblock.get() {
317 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
323 } else {
324 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
325 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
328 ecx.block_thread(
329 BlockReason::VirtualSocket,
330 None,
331 callback!(
332 @capture<'tcx> {
333 weak_self_ref: WeakFileDescriptionRef<VirtualSocket>,
334 ptr: Pointer,
335 len: usize,
336 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
337 }
338 |this, unblock: UnblockKind| {
339 assert_eq!(unblock, UnblockKind::Ready);
340 let self_ref = weak_self_ref.upgrade().unwrap();
343 virtual_socket_read(self_ref, ptr, len, this, finish)
344 }
345 ),
346 );
347 }
348 } else {
349 let mut readbuf = readbuf.borrow_mut();
351 ecx.acquire_clock(&readbuf.clock)?;
355
356 let read_size = ecx.read_from_host(|buf| readbuf.buf.read(buf), len, ptr)?.unwrap();
359 let readbuf_now_empty = readbuf.buf.is_empty();
360
361 drop(readbuf);
363
364 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
372 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
374 for thread_id in waiting_threads {
376 ecx.unblock_thread(thread_id, BlockReason::VirtualSocket)?;
377 }
378 ecx.update_epoll_active_events(peer_fd, readbuf_now_empty)?;
383 };
384 ecx.update_epoll_active_events(self_ref, false)?;
386
387 return finish.call(ecx, Ok(read_size));
388 }
389 interp_ok(())
390}
391
392impl UnixFileDescription for VirtualSocket {
393 fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> {
394 let mut epoll_readiness = EpollReadiness::empty();
398
399 if let Some(readbuf) = &self.readbuf {
401 if !readbuf.borrow().buf.is_empty() {
402 epoll_readiness.epollin = true;
403 }
404 } else {
405 epoll_readiness.epollin = true;
407 }
408
409 if let Some(peer_fd) = self.peer_fd().upgrade() {
411 if let Some(writebuf) = &peer_fd.readbuf {
412 let data_size = writebuf.borrow().buf.len();
413 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
414 if available_space != 0 {
415 epoll_readiness.epollout = true;
416 }
417 } else {
418 epoll_readiness.epollout = true;
420 }
421 } else {
422 epoll_readiness.epollrdhup = true;
425 epoll_readiness.epollhup = true;
426 epoll_readiness.epollin = true;
430 epoll_readiness.epollout = true;
431 if self.peer_lost_data.get() {
433 epoll_readiness.epollerr = true;
434 }
435 }
436 interp_ok(epoll_readiness)
437 }
438}
439
440impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
441pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
442 fn socketpair(
445 &mut self,
446 domain: &OpTy<'tcx>,
447 type_: &OpTy<'tcx>,
448 protocol: &OpTy<'tcx>,
449 sv: &OpTy<'tcx>,
450 ) -> InterpResult<'tcx, Scalar> {
451 let this = self.eval_context_mut();
452
453 let domain = this.read_scalar(domain)?.to_i32()?;
454 let mut flags = this.read_scalar(type_)?.to_i32()?;
455 let protocol = this.read_scalar(protocol)?.to_i32()?;
456 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
458
459 let mut is_sock_nonblock = false;
460
461 if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android) {
464 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
466 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
467 if flags & sock_nonblock == sock_nonblock {
468 is_sock_nonblock = true;
469 flags &= !sock_nonblock;
470 }
471 if flags & sock_cloexec == sock_cloexec {
472 flags &= !sock_cloexec;
473 }
474 }
475
476 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
480 throw_unsup_format!(
481 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
482 and AF_LOCAL are allowed",
483 domain
484 );
485 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
486 throw_unsup_format!(
487 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
488 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
489 flags
490 );
491 } else if protocol != 0 {
492 throw_unsup_format!(
493 "socketpair: socket protocol {protocol} is unsupported, \
494 only 0 is allowed",
495 );
496 }
497
498 let fds = &mut this.machine.fds;
500 let fd0 = fds.new_ref(VirtualSocket {
501 readbuf: Some(RefCell::new(Buffer::new())),
502 peer_fd: OnceCell::new(),
503 peer_lost_data: Cell::new(false),
504 blocked_read_tid: RefCell::new(Vec::new()),
505 blocked_write_tid: RefCell::new(Vec::new()),
506 is_nonblock: Cell::new(is_sock_nonblock),
507 fd_type: VirtualSocketType::Socketpair,
508 });
509 let fd1 = fds.new_ref(VirtualSocket {
510 readbuf: Some(RefCell::new(Buffer::new())),
511 peer_fd: OnceCell::new(),
512 peer_lost_data: Cell::new(false),
513 blocked_read_tid: RefCell::new(Vec::new()),
514 blocked_write_tid: RefCell::new(Vec::new()),
515 is_nonblock: Cell::new(is_sock_nonblock),
516 fd_type: VirtualSocketType::Socketpair,
517 });
518
519 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
521 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
522
523 let sv0 = fds.insert(fd0);
525 let sv1 = fds.insert(fd1);
526
527 let sv0 = Scalar::from_int(sv0, sv.layout.size);
529 let sv1 = Scalar::from_int(sv1, sv.layout.size);
530 this.write_scalar(sv0, &sv)?;
531 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
532
533 interp_ok(Scalar::from_i32(0))
534 }
535
536 fn pipe2(
537 &mut self,
538 pipefd: &OpTy<'tcx>,
539 flags: Option<&OpTy<'tcx>>,
540 ) -> InterpResult<'tcx, Scalar> {
541 let this = self.eval_context_mut();
542
543 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
544 let mut flags = match flags {
545 Some(flags) => this.read_scalar(flags)?.to_i32()?,
546 None => 0,
547 };
548
549 let cloexec = this.eval_libc_i32("O_CLOEXEC");
550 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
551
552 let mut is_nonblock = false;
555 if flags & o_nonblock == o_nonblock {
556 is_nonblock = true;
557 flags &= !o_nonblock;
558 }
559 if flags & cloexec == cloexec {
561 flags &= !cloexec;
562 }
563 if flags != 0 {
564 throw_unsup_format!("unsupported flags in `pipe2`");
565 }
566
567 let fds = &mut this.machine.fds;
570 let fd0 = fds.new_ref(VirtualSocket {
571 readbuf: Some(RefCell::new(Buffer::new())),
572 peer_fd: OnceCell::new(),
573 peer_lost_data: Cell::new(false),
574 blocked_read_tid: RefCell::new(Vec::new()),
575 blocked_write_tid: RefCell::new(Vec::new()),
576 is_nonblock: Cell::new(is_nonblock),
577 fd_type: VirtualSocketType::PipeRead,
578 });
579 let fd1 = fds.new_ref(VirtualSocket {
580 readbuf: None,
581 peer_fd: OnceCell::new(),
582 peer_lost_data: Cell::new(false),
583 blocked_read_tid: RefCell::new(Vec::new()),
584 blocked_write_tid: RefCell::new(Vec::new()),
585 is_nonblock: Cell::new(is_nonblock),
586 fd_type: VirtualSocketType::PipeWrite,
587 });
588
589 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
591 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
592
593 let pipefd0 = fds.insert(fd0);
595 let pipefd1 = fds.insert(fd1);
596
597 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
599 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
600 this.write_scalar(pipefd0, &pipefd)?;
601 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
602
603 interp_ok(Scalar::from_i32(0))
604 }
605}