1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io::{self, ErrorKind, Read};
8
9use rustc_target::spec::Os;
10
11use crate::concurrency::VClock;
12use crate::shims::files::{
13 EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
14};
15use crate::shims::unix::UnixFileDescription;
16use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
17use crate::*;
18
19const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
23
24#[derive(Debug, PartialEq)]
25enum VirtualSocketType {
26 Socketpair,
28 PipeRead,
30 PipeWrite,
32}
33
34#[derive(Debug)]
36struct VirtualSocket {
37 readbuf: Option<RefCell<Buffer>>,
40 peer_fd: OnceCell<WeakFileDescriptionRef<VirtualSocket>>,
44 peer_lost_data: Cell<bool>,
48 blocked_read_tid: RefCell<Vec<ThreadId>>,
51 blocked_write_tid: RefCell<Vec<ThreadId>>,
54 is_nonblock: Cell<bool>,
56 fd_type: VirtualSocketType,
58}
59
60#[derive(Debug)]
61struct Buffer {
62 buf: VecDeque<u8>,
63 clock: VClock,
64}
65
66impl Buffer {
67 fn new() -> Self {
68 Buffer { buf: VecDeque::new(), clock: VClock::default() }
69 }
70}
71
72impl VirtualSocket {
73 fn peer_fd(&self) -> &WeakFileDescriptionRef<VirtualSocket> {
74 self.peer_fd.get().unwrap()
75 }
76}
77
78impl FileDescription for VirtualSocket {
79 fn name(&self) -> &'static str {
80 match self.fd_type {
81 VirtualSocketType::Socketpair => "socketpair",
82 VirtualSocketType::PipeRead | VirtualSocketType::PipeWrite => "pipe",
83 }
84 }
85
86 fn metadata<'tcx>(
87 &self,
88 ) -> InterpResult<'tcx, Either<io::Result<std::fs::Metadata>, &'static str>> {
89 let mode_name = match self.fd_type {
90 VirtualSocketType::Socketpair => "S_IFSOCK",
91 VirtualSocketType::PipeRead | VirtualSocketType::PipeWrite => "S_IFIFO",
92 };
93 interp_ok(Either::Right(mode_name))
94 }
95
96 fn destroy<'tcx>(
97 self,
98 _self_id: FdId,
99 _communicate_allowed: bool,
100 ecx: &mut MiriInterpCx<'tcx>,
101 ) -> InterpResult<'tcx, io::Result<()>> {
102 if let Some(peer_fd) = self.peer_fd().upgrade() {
103 if let Some(readbuf) = &self.readbuf {
106 if !readbuf.borrow().buf.is_empty() {
107 peer_fd.peer_lost_data.set(true);
108 }
109 }
110 ecx.update_epoll_active_events(peer_fd, false)?;
112 }
113 interp_ok(Ok(()))
114 }
115
116 fn read<'tcx>(
117 self: FileDescriptionRef<Self>,
118 _communicate_allowed: bool,
119 ptr: Pointer,
120 len: usize,
121 ecx: &mut MiriInterpCx<'tcx>,
122 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
123 ) -> InterpResult<'tcx> {
124 virtual_socket_read(self, ptr, len, ecx, finish)
125 }
126
127 fn write<'tcx>(
128 self: FileDescriptionRef<Self>,
129 _communicate_allowed: bool,
130 ptr: Pointer,
131 len: usize,
132 ecx: &mut MiriInterpCx<'tcx>,
133 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
134 ) -> InterpResult<'tcx> {
135 virtual_socket_write(self, ptr, len, ecx, finish)
136 }
137
138 fn short_fd_operations(&self) -> bool {
139 false
146 }
147
148 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
149 self
150 }
151
152 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
153 let mut flags = 0;
154
155 match self.fd_type {
160 VirtualSocketType::Socketpair => {
161 flags |= ecx.eval_libc_i32("O_RDWR");
162 }
163 VirtualSocketType::PipeRead => {
164 flags |= ecx.eval_libc_i32("O_RDONLY");
165 }
166 VirtualSocketType::PipeWrite => {
167 flags |= ecx.eval_libc_i32("O_WRONLY");
168 }
169 }
170
171 if self.is_nonblock.get() {
173 flags |= ecx.eval_libc_i32("O_NONBLOCK");
174 }
175
176 interp_ok(Scalar::from_i32(flags))
177 }
178
179 fn set_flags<'tcx>(
180 &self,
181 mut flag: i32,
182 ecx: &mut MiriInterpCx<'tcx>,
183 ) -> InterpResult<'tcx, Scalar> {
184 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
185
186 if flag & o_nonblock == o_nonblock {
188 self.is_nonblock.set(true);
189 flag &= !o_nonblock;
190 } else {
191 self.is_nonblock.set(false);
192 }
193
194 if flag != 0 {
196 throw_unsup_format!(
197 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
198 )
199 }
200
201 interp_ok(Scalar::from_i32(0))
202 }
203}
204
205fn virtual_socket_write<'tcx>(
207 self_ref: FileDescriptionRef<VirtualSocket>,
208 ptr: Pointer,
209 len: usize,
210 ecx: &mut MiriInterpCx<'tcx>,
211 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
212) -> InterpResult<'tcx> {
213 if len == 0 {
216 return finish.call(ecx, Ok(0));
217 }
218
219 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
221 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
224 };
225
226 let Some(writebuf) = &peer_fd.readbuf else {
227 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
229 };
230
231 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
233 if available_space == 0 {
234 if self_ref.is_nonblock.get() {
235 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
237 } else {
238 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
239 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
242 ecx.block_thread(
243 BlockReason::VirtualSocket,
244 None,
245 callback!(
246 @capture<'tcx> {
247 weak_self_ref: WeakFileDescriptionRef<VirtualSocket>,
248 ptr: Pointer,
249 len: usize,
250 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
251 }
252 |this, unblock: UnblockKind| {
253 assert_eq!(unblock, UnblockKind::Ready);
254 let self_ref = weak_self_ref.upgrade().unwrap();
257 virtual_socket_write(self_ref, ptr, len, this, finish)
258 }
259 ),
260 );
261 }
262 } else {
263 let mut writebuf = writebuf.borrow_mut();
265 ecx.release_clock(|clock| {
267 writebuf.clock.join(clock);
268 })?;
269 let write_size = len.min(available_space);
271 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
272 assert_eq!(actual_write_size, write_size);
273
274 drop(writebuf);
276
277 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
279 for thread_id in waiting_threads {
281 ecx.unblock_thread(thread_id, BlockReason::VirtualSocket)?;
282 }
283 ecx.update_epoll_active_events(self_ref, false)?;
287 ecx.update_epoll_active_events(peer_fd, true)?;
288
289 return finish.call(ecx, Ok(write_size));
290 }
291 interp_ok(())
292}
293
294fn virtual_socket_read<'tcx>(
296 self_ref: FileDescriptionRef<VirtualSocket>,
297 ptr: Pointer,
298 len: usize,
299 ecx: &mut MiriInterpCx<'tcx>,
300 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
301) -> InterpResult<'tcx> {
302 if len == 0 {
304 return finish.call(ecx, Ok(0));
305 }
306
307 let Some(readbuf) = &self_ref.readbuf else {
308 throw_unsup_format!("reading from the write end of a pipe")
311 };
312
313 if readbuf.borrow_mut().buf.is_empty() {
314 if self_ref.peer_fd().upgrade().is_none() {
315 return finish.call(ecx, Ok(0));
318 } else if self_ref.is_nonblock.get() {
319 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
325 } else {
326 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
327 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
330 ecx.block_thread(
331 BlockReason::VirtualSocket,
332 None,
333 callback!(
334 @capture<'tcx> {
335 weak_self_ref: WeakFileDescriptionRef<VirtualSocket>,
336 ptr: Pointer,
337 len: usize,
338 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
339 }
340 |this, unblock: UnblockKind| {
341 assert_eq!(unblock, UnblockKind::Ready);
342 let self_ref = weak_self_ref.upgrade().unwrap();
345 virtual_socket_read(self_ref, ptr, len, this, finish)
346 }
347 ),
348 );
349 }
350 } else {
351 let mut readbuf = readbuf.borrow_mut();
353 ecx.acquire_clock(&readbuf.clock)?;
357
358 let read_size = ecx.read_from_host(|buf| readbuf.buf.read(buf), len, ptr)?.unwrap();
361 let readbuf_now_empty = readbuf.buf.is_empty();
362
363 drop(readbuf);
365
366 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
374 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
376 for thread_id in waiting_threads {
378 ecx.unblock_thread(thread_id, BlockReason::VirtualSocket)?;
379 }
380 ecx.update_epoll_active_events(peer_fd, readbuf_now_empty)?;
385 };
386 ecx.update_epoll_active_events(self_ref, false)?;
388
389 return finish.call(ecx, Ok(read_size));
390 }
391 interp_ok(())
392}
393
394impl UnixFileDescription for VirtualSocket {
395 fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
396 let mut epoll_ready_events = EpollEvents::new();
400
401 if let Some(readbuf) = &self.readbuf {
403 if !readbuf.borrow().buf.is_empty() {
404 epoll_ready_events.epollin = true;
405 }
406 } else {
407 epoll_ready_events.epollin = true;
409 }
410
411 if let Some(peer_fd) = self.peer_fd().upgrade() {
413 if let Some(writebuf) = &peer_fd.readbuf {
414 let data_size = writebuf.borrow().buf.len();
415 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
416 if available_space != 0 {
417 epoll_ready_events.epollout = true;
418 }
419 } else {
420 epoll_ready_events.epollout = true;
422 }
423 } else {
424 epoll_ready_events.epollrdhup = true;
427 epoll_ready_events.epollhup = true;
428 epoll_ready_events.epollin = true;
432 epoll_ready_events.epollout = true;
433 if self.peer_lost_data.get() {
435 epoll_ready_events.epollerr = true;
436 }
437 }
438 interp_ok(epoll_ready_events)
439 }
440}
441
442impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
443pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
444 fn socketpair(
447 &mut self,
448 domain: &OpTy<'tcx>,
449 type_: &OpTy<'tcx>,
450 protocol: &OpTy<'tcx>,
451 sv: &OpTy<'tcx>,
452 ) -> InterpResult<'tcx, Scalar> {
453 let this = self.eval_context_mut();
454
455 let domain = this.read_scalar(domain)?.to_i32()?;
456 let mut flags = this.read_scalar(type_)?.to_i32()?;
457 let protocol = this.read_scalar(protocol)?.to_i32()?;
458 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
460
461 let mut is_sock_nonblock = false;
462
463 if matches!(this.tcx.sess.target.os, Os::Linux | Os::Android) {
466 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
468 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
469 if flags & sock_nonblock == sock_nonblock {
470 is_sock_nonblock = true;
471 flags &= !sock_nonblock;
472 }
473 if flags & sock_cloexec == sock_cloexec {
474 flags &= !sock_cloexec;
475 }
476 }
477
478 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
482 throw_unsup_format!(
483 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
484 and AF_LOCAL are allowed",
485 domain
486 );
487 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
488 throw_unsup_format!(
489 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
490 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
491 flags
492 );
493 } else if protocol != 0 {
494 throw_unsup_format!(
495 "socketpair: socket protocol {protocol} is unsupported, \
496 only 0 is allowed",
497 );
498 }
499
500 let fds = &mut this.machine.fds;
502 let fd0 = fds.new_ref(VirtualSocket {
503 readbuf: Some(RefCell::new(Buffer::new())),
504 peer_fd: OnceCell::new(),
505 peer_lost_data: Cell::new(false),
506 blocked_read_tid: RefCell::new(Vec::new()),
507 blocked_write_tid: RefCell::new(Vec::new()),
508 is_nonblock: Cell::new(is_sock_nonblock),
509 fd_type: VirtualSocketType::Socketpair,
510 });
511 let fd1 = fds.new_ref(VirtualSocket {
512 readbuf: Some(RefCell::new(Buffer::new())),
513 peer_fd: OnceCell::new(),
514 peer_lost_data: Cell::new(false),
515 blocked_read_tid: RefCell::new(Vec::new()),
516 blocked_write_tid: RefCell::new(Vec::new()),
517 is_nonblock: Cell::new(is_sock_nonblock),
518 fd_type: VirtualSocketType::Socketpair,
519 });
520
521 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
523 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
524
525 let sv0 = fds.insert(fd0);
527 let sv1 = fds.insert(fd1);
528
529 let sv0 = Scalar::from_int(sv0, sv.layout.size);
531 let sv1 = Scalar::from_int(sv1, sv.layout.size);
532 this.write_scalar(sv0, &sv)?;
533 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
534
535 interp_ok(Scalar::from_i32(0))
536 }
537
538 fn pipe2(
539 &mut self,
540 pipefd: &OpTy<'tcx>,
541 flags: Option<&OpTy<'tcx>>,
542 ) -> InterpResult<'tcx, Scalar> {
543 let this = self.eval_context_mut();
544
545 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
546 let mut flags = match flags {
547 Some(flags) => this.read_scalar(flags)?.to_i32()?,
548 None => 0,
549 };
550
551 let cloexec = this.eval_libc_i32("O_CLOEXEC");
552 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
553
554 let mut is_nonblock = false;
557 if flags & o_nonblock == o_nonblock {
558 is_nonblock = true;
559 flags &= !o_nonblock;
560 }
561 if flags & cloexec == cloexec {
563 flags &= !cloexec;
564 }
565 if flags != 0 {
566 throw_unsup_format!("unsupported flags in `pipe2`");
567 }
568
569 let fds = &mut this.machine.fds;
572 let fd0 = fds.new_ref(VirtualSocket {
573 readbuf: Some(RefCell::new(Buffer::new())),
574 peer_fd: OnceCell::new(),
575 peer_lost_data: Cell::new(false),
576 blocked_read_tid: RefCell::new(Vec::new()),
577 blocked_write_tid: RefCell::new(Vec::new()),
578 is_nonblock: Cell::new(is_nonblock),
579 fd_type: VirtualSocketType::PipeRead,
580 });
581 let fd1 = fds.new_ref(VirtualSocket {
582 readbuf: None,
583 peer_fd: OnceCell::new(),
584 peer_lost_data: Cell::new(false),
585 blocked_read_tid: RefCell::new(Vec::new()),
586 blocked_write_tid: RefCell::new(Vec::new()),
587 is_nonblock: Cell::new(is_nonblock),
588 fd_type: VirtualSocketType::PipeWrite,
589 });
590
591 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
593 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
594
595 let pipefd0 = fds.insert(fd0);
597 let pipefd1 = fds.insert(fd1);
598
599 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
601 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
602 this.write_scalar(pipefd0, &pipefd)?;
603 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
604
605 interp_ok(Scalar::from_i32(0))
606 }
607}