1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use rustc_target::spec::Os;
11
12use crate::concurrency::VClock;
13use crate::shims::files::{
14 EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
15};
16use crate::shims::unix::UnixFileDescription;
17use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
18use crate::*;
19
20const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 0x34000;
24
25#[derive(Debug, PartialEq)]
26enum AnonSocketType {
27 Socketpair,
29 PipeRead,
31 PipeWrite,
33}
34
35#[derive(Debug)]
37struct AnonSocket {
38 readbuf: Option<RefCell<Buffer>>,
41 peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
45 peer_lost_data: Cell<bool>,
49 blocked_read_tid: RefCell<Vec<ThreadId>>,
52 blocked_write_tid: RefCell<Vec<ThreadId>>,
55 is_nonblock: Cell<bool>,
57 fd_type: AnonSocketType,
59}
60
61#[derive(Debug)]
62struct Buffer {
63 buf: VecDeque<u8>,
64 clock: VClock,
65}
66
67impl Buffer {
68 fn new() -> Self {
69 Buffer { buf: VecDeque::new(), clock: VClock::default() }
70 }
71}
72
73impl AnonSocket {
74 fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
75 self.peer_fd.get().unwrap()
76 }
77}
78
79impl FileDescription for AnonSocket {
80 fn name(&self) -> &'static str {
81 match self.fd_type {
82 AnonSocketType::Socketpair => "socketpair",
83 AnonSocketType::PipeRead | AnonSocketType::PipeWrite => "pipe",
84 }
85 }
86
87 fn destroy<'tcx>(
88 self,
89 _self_id: FdId,
90 _communicate_allowed: bool,
91 ecx: &mut MiriInterpCx<'tcx>,
92 ) -> InterpResult<'tcx, io::Result<()>> {
93 if let Some(peer_fd) = self.peer_fd().upgrade() {
94 if let Some(readbuf) = &self.readbuf {
97 if !readbuf.borrow().buf.is_empty() {
98 peer_fd.peer_lost_data.set(true);
99 }
100 }
101 ecx.update_epoll_active_events(peer_fd, false)?;
103 }
104 interp_ok(Ok(()))
105 }
106
107 fn read<'tcx>(
108 self: FileDescriptionRef<Self>,
109 _communicate_allowed: bool,
110 ptr: Pointer,
111 len: usize,
112 ecx: &mut MiriInterpCx<'tcx>,
113 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
114 ) -> InterpResult<'tcx> {
115 anonsocket_read(self, ptr, len, ecx, finish)
116 }
117
118 fn write<'tcx>(
119 self: FileDescriptionRef<Self>,
120 _communicate_allowed: bool,
121 ptr: Pointer,
122 len: usize,
123 ecx: &mut MiriInterpCx<'tcx>,
124 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
125 ) -> InterpResult<'tcx> {
126 anonsocket_write(self, ptr, len, ecx, finish)
127 }
128
129 fn short_fd_operations(&self) -> bool {
130 matches!(self.fd_type, AnonSocketType::Socketpair)
135 }
136
137 fn as_unix<'tcx>(&self, _ecx: &MiriInterpCx<'tcx>) -> &dyn UnixFileDescription {
138 self
139 }
140
141 fn get_flags<'tcx>(&self, ecx: &mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Scalar> {
142 let mut flags = 0;
143
144 match self.fd_type {
149 AnonSocketType::Socketpair => {
150 flags |= ecx.eval_libc_i32("O_RDWR");
151 }
152 AnonSocketType::PipeRead => {
153 flags |= ecx.eval_libc_i32("O_RDONLY");
154 }
155 AnonSocketType::PipeWrite => {
156 flags |= ecx.eval_libc_i32("O_WRONLY");
157 }
158 }
159
160 if self.is_nonblock.get() {
162 flags |= ecx.eval_libc_i32("O_NONBLOCK");
163 }
164
165 interp_ok(Scalar::from_i32(flags))
166 }
167
168 fn set_flags<'tcx>(
169 &self,
170 mut flag: i32,
171 ecx: &mut MiriInterpCx<'tcx>,
172 ) -> InterpResult<'tcx, Scalar> {
173 let o_nonblock = ecx.eval_libc_i32("O_NONBLOCK");
176 let o_rdonly = ecx.eval_libc_i32("O_RDONLY");
177 let o_wronly = ecx.eval_libc_i32("O_WRONLY");
178 let o_rdwr = ecx.eval_libc_i32("O_RDWR");
179
180 if flag & o_nonblock == o_nonblock {
182 self.is_nonblock.set(true);
183 flag &= !o_nonblock;
184 } else {
185 self.is_nonblock.set(false);
186 }
187
188 flag &= !(o_rdonly | o_wronly | o_rdwr);
190
191 if flag != 0 {
193 throw_unsup_format!(
194 "fcntl: only O_NONBLOCK is supported for F_SETFL on socketpairs and pipes"
195 )
196 }
197
198 interp_ok(Scalar::from_i32(0))
199 }
200}
201
202fn anonsocket_write<'tcx>(
204 self_ref: FileDescriptionRef<AnonSocket>,
205 ptr: Pointer,
206 len: usize,
207 ecx: &mut MiriInterpCx<'tcx>,
208 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
209) -> InterpResult<'tcx> {
210 if len == 0 {
213 return finish.call(ecx, Ok(0));
214 }
215
216 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
218 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
221 };
222
223 let Some(writebuf) = &peer_fd.readbuf else {
224 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
226 };
227
228 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
230 if available_space == 0 {
231 if self_ref.is_nonblock.get() {
232 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
234 } else {
235 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
236 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
239 ecx.block_thread(
240 BlockReason::UnnamedSocket,
241 None,
242 callback!(
243 @capture<'tcx> {
244 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
245 ptr: Pointer,
246 len: usize,
247 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
248 }
249 |this, unblock: UnblockKind| {
250 assert_eq!(unblock, UnblockKind::Ready);
251 let self_ref = weak_self_ref.upgrade().unwrap();
254 anonsocket_write(self_ref, ptr, len, this, finish)
255 }
256 ),
257 );
258 }
259 } else {
260 let mut writebuf = writebuf.borrow_mut();
262 ecx.release_clock(|clock| {
264 writebuf.clock.join(clock);
265 })?;
266 let write_size = len.min(available_space);
268 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
269 assert_eq!(actual_write_size, write_size);
270
271 drop(writebuf);
273
274 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
276 for thread_id in waiting_threads {
278 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
279 }
280 ecx.update_epoll_active_events(self_ref, false)?;
284 ecx.update_epoll_active_events(peer_fd, true)?;
285
286 return finish.call(ecx, Ok(write_size));
287 }
288 interp_ok(())
289}
290
291fn anonsocket_read<'tcx>(
293 self_ref: FileDescriptionRef<AnonSocket>,
294 ptr: Pointer,
295 len: usize,
296 ecx: &mut MiriInterpCx<'tcx>,
297 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
298) -> InterpResult<'tcx> {
299 if len == 0 {
301 return finish.call(ecx, Ok(0));
302 }
303
304 let Some(readbuf) = &self_ref.readbuf else {
305 throw_unsup_format!("reading from the write end of a pipe")
308 };
309
310 if readbuf.borrow_mut().buf.is_empty() {
311 if self_ref.peer_fd().upgrade().is_none() {
312 return finish.call(ecx, Ok(0));
315 } else if self_ref.is_nonblock.get() {
316 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
322 } else {
323 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
324 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
327 ecx.block_thread(
328 BlockReason::UnnamedSocket,
329 None,
330 callback!(
331 @capture<'tcx> {
332 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
333 ptr: Pointer,
334 len: usize,
335 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
336 }
337 |this, unblock: UnblockKind| {
338 assert_eq!(unblock, UnblockKind::Ready);
339 let self_ref = weak_self_ref.upgrade().unwrap();
342 anonsocket_read(self_ref, ptr, len, this, finish)
343 }
344 ),
345 );
346 }
347 } else {
348 let mut readbuf = readbuf.borrow_mut();
350 ecx.acquire_clock(&readbuf.clock)?;
354
355 let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
358 let readbuf_now_empty = readbuf.buf.is_empty();
359
360 drop(readbuf);
362
363 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
371 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
373 for thread_id in waiting_threads {
375 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
376 }
377 ecx.update_epoll_active_events(peer_fd, readbuf_now_empty)?;
382 };
383 ecx.update_epoll_active_events(self_ref, false)?;
385
386 return finish.call(ecx, Ok(read_size));
387 }
388 interp_ok(())
389}
390
391impl UnixFileDescription for AnonSocket {
392 fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
393 let mut epoll_ready_events = EpollEvents::new();
397
398 if let Some(readbuf) = &self.readbuf {
400 if !readbuf.borrow().buf.is_empty() {
401 epoll_ready_events.epollin = true;
402 }
403 } else {
404 epoll_ready_events.epollin = true;
406 }
407
408 if let Some(peer_fd) = self.peer_fd().upgrade() {
410 if let Some(writebuf) = &peer_fd.readbuf {
411 let data_size = writebuf.borrow().buf.len();
412 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
413 if available_space != 0 {
414 epoll_ready_events.epollout = true;
415 }
416 } else {
417 epoll_ready_events.epollout = true;
419 }
420 } else {
421 epoll_ready_events.epollrdhup = true;
424 epoll_ready_events.epollhup = true;
425 epoll_ready_events.epollin = true;
429 epoll_ready_events.epollout = true;
430 if self.peer_lost_data.get() {
432 epoll_ready_events.epollerr = true;
433 }
434 }
435 interp_ok(epoll_ready_events)
436 }
437}
438
439impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
440pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
441 fn socketpair(
444 &mut self,
445 domain: &OpTy<'tcx>,
446 type_: &OpTy<'tcx>,
447 protocol: &OpTy<'tcx>,
448 sv: &OpTy<'tcx>,
449 ) -> InterpResult<'tcx, Scalar> {
450 let this = self.eval_context_mut();
451
452 let domain = this.read_scalar(domain)?.to_i32()?;
453 let mut flags = this.read_scalar(type_)?.to_i32()?;
454 let protocol = this.read_scalar(protocol)?.to_i32()?;
455 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
457
458 let mut is_sock_nonblock = false;
459
460 if this.tcx.sess.target.os == Os::Linux {
463 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
465 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
466 if flags & sock_nonblock == sock_nonblock {
467 is_sock_nonblock = true;
468 flags &= !sock_nonblock;
469 }
470 if flags & sock_cloexec == sock_cloexec {
471 flags &= !sock_cloexec;
472 }
473 }
474
475 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
479 throw_unsup_format!(
480 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
481 and AF_LOCAL are allowed",
482 domain
483 );
484 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
485 throw_unsup_format!(
486 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
487 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
488 flags
489 );
490 } else if protocol != 0 {
491 throw_unsup_format!(
492 "socketpair: socket protocol {protocol} is unsupported, \
493 only 0 is allowed",
494 );
495 }
496
497 let fds = &mut this.machine.fds;
499 let fd0 = fds.new_ref(AnonSocket {
500 readbuf: Some(RefCell::new(Buffer::new())),
501 peer_fd: OnceCell::new(),
502 peer_lost_data: Cell::new(false),
503 blocked_read_tid: RefCell::new(Vec::new()),
504 blocked_write_tid: RefCell::new(Vec::new()),
505 is_nonblock: Cell::new(is_sock_nonblock),
506 fd_type: AnonSocketType::Socketpair,
507 });
508 let fd1 = fds.new_ref(AnonSocket {
509 readbuf: Some(RefCell::new(Buffer::new())),
510 peer_fd: OnceCell::new(),
511 peer_lost_data: Cell::new(false),
512 blocked_read_tid: RefCell::new(Vec::new()),
513 blocked_write_tid: RefCell::new(Vec::new()),
514 is_nonblock: Cell::new(is_sock_nonblock),
515 fd_type: AnonSocketType::Socketpair,
516 });
517
518 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
520 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
521
522 let sv0 = fds.insert(fd0);
524 let sv1 = fds.insert(fd1);
525
526 let sv0 = Scalar::from_int(sv0, sv.layout.size);
528 let sv1 = Scalar::from_int(sv1, sv.layout.size);
529 this.write_scalar(sv0, &sv)?;
530 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
531
532 interp_ok(Scalar::from_i32(0))
533 }
534
535 fn pipe2(
536 &mut self,
537 pipefd: &OpTy<'tcx>,
538 flags: Option<&OpTy<'tcx>>,
539 ) -> InterpResult<'tcx, Scalar> {
540 let this = self.eval_context_mut();
541
542 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
543 let mut flags = match flags {
544 Some(flags) => this.read_scalar(flags)?.to_i32()?,
545 None => 0,
546 };
547
548 let cloexec = this.eval_libc_i32("O_CLOEXEC");
549 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
550
551 let mut is_nonblock = false;
554 if flags & o_nonblock == o_nonblock {
555 is_nonblock = true;
556 flags &= !o_nonblock;
557 }
558 if flags & cloexec == cloexec {
560 flags &= !cloexec;
561 }
562 if flags != 0 {
563 throw_unsup_format!("unsupported flags in `pipe2`");
564 }
565
566 let fds = &mut this.machine.fds;
569 let fd0 = fds.new_ref(AnonSocket {
570 readbuf: Some(RefCell::new(Buffer::new())),
571 peer_fd: OnceCell::new(),
572 peer_lost_data: Cell::new(false),
573 blocked_read_tid: RefCell::new(Vec::new()),
574 blocked_write_tid: RefCell::new(Vec::new()),
575 is_nonblock: Cell::new(is_nonblock),
576 fd_type: AnonSocketType::PipeRead,
577 });
578 let fd1 = fds.new_ref(AnonSocket {
579 readbuf: None,
580 peer_fd: OnceCell::new(),
581 peer_lost_data: Cell::new(false),
582 blocked_read_tid: RefCell::new(Vec::new()),
583 blocked_write_tid: RefCell::new(Vec::new()),
584 is_nonblock: Cell::new(is_nonblock),
585 fd_type: AnonSocketType::PipeWrite,
586 });
587
588 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
590 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
591
592 let pipefd0 = fds.insert(fd0);
594 let pipefd1 = fds.insert(fd1);
595
596 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
598 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
599 this.write_scalar(pipefd0, &pipefd)?;
600 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
601
602 interp_ok(Scalar::from_i32(0))
603 }
604}