1use std::cell::{Cell, OnceCell, RefCell};
6use std::collections::VecDeque;
7use std::io;
8use std::io::ErrorKind;
9
10use crate::concurrency::VClock;
11use crate::shims::files::{
12 EvalContextExt as _, FileDescription, FileDescriptionRef, WeakFileDescriptionRef,
13};
14use crate::shims::unix::UnixFileDescription;
15use crate::shims::unix::linux_like::epoll::{EpollReadyEvents, EvalContextExt as _};
16use crate::*;
17
18const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
22
23#[derive(Debug)]
25struct AnonSocket {
26 readbuf: Option<RefCell<Buffer>>,
29 peer_fd: OnceCell<WeakFileDescriptionRef<AnonSocket>>,
33 peer_lost_data: Cell<bool>,
37 blocked_read_tid: RefCell<Vec<ThreadId>>,
40 blocked_write_tid: RefCell<Vec<ThreadId>>,
43 is_nonblock: bool,
44}
45
46#[derive(Debug)]
47struct Buffer {
48 buf: VecDeque<u8>,
49 clock: VClock,
50}
51
52impl Buffer {
53 fn new() -> Self {
54 Buffer { buf: VecDeque::new(), clock: VClock::default() }
55 }
56}
57
58impl AnonSocket {
59 fn peer_fd(&self) -> &WeakFileDescriptionRef<AnonSocket> {
60 self.peer_fd.get().unwrap()
61 }
62}
63
64impl FileDescription for AnonSocket {
65 fn name(&self) -> &'static str {
66 "socketpair"
67 }
68
69 fn close<'tcx>(
70 self,
71 _communicate_allowed: bool,
72 ecx: &mut MiriInterpCx<'tcx>,
73 ) -> InterpResult<'tcx, io::Result<()>> {
74 if let Some(peer_fd) = self.peer_fd().upgrade() {
75 if let Some(readbuf) = &self.readbuf {
78 if !readbuf.borrow().buf.is_empty() {
79 peer_fd.peer_lost_data.set(true);
80 }
81 }
82 ecx.check_and_update_readiness(peer_fd)?;
84 }
85 interp_ok(Ok(()))
86 }
87
88 fn read<'tcx>(
89 self: FileDescriptionRef<Self>,
90 _communicate_allowed: bool,
91 ptr: Pointer,
92 len: usize,
93 ecx: &mut MiriInterpCx<'tcx>,
94 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
95 ) -> InterpResult<'tcx> {
96 anonsocket_read(self, ptr, len, ecx, finish)
97 }
98
99 fn write<'tcx>(
100 self: FileDescriptionRef<Self>,
101 _communicate_allowed: bool,
102 ptr: Pointer,
103 len: usize,
104 ecx: &mut MiriInterpCx<'tcx>,
105 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
106 ) -> InterpResult<'tcx> {
107 anonsocket_write(self, ptr, len, ecx, finish)
108 }
109
110 fn as_unix(&self) -> &dyn UnixFileDescription {
111 self
112 }
113}
114
115fn anonsocket_write<'tcx>(
117 self_ref: FileDescriptionRef<AnonSocket>,
118 ptr: Pointer,
119 len: usize,
120 ecx: &mut MiriInterpCx<'tcx>,
121 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
122) -> InterpResult<'tcx> {
123 if len == 0 {
126 return finish.call(ecx, Ok(0));
127 }
128
129 let Some(peer_fd) = self_ref.peer_fd().upgrade() else {
131 return finish.call(ecx, Err(ErrorKind::BrokenPipe.into()));
134 };
135
136 let Some(writebuf) = &peer_fd.readbuf else {
137 return finish.call(ecx, Err(IoError::LibcError("EBADF")));
139 };
140
141 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
143 if available_space == 0 {
144 if self_ref.is_nonblock {
145 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
147 } else {
148 self_ref.blocked_write_tid.borrow_mut().push(ecx.active_thread());
149 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
152 ecx.block_thread(
153 BlockReason::UnnamedSocket,
154 None,
155 callback!(
156 @capture<'tcx> {
157 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
158 ptr: Pointer,
159 len: usize,
160 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
161 }
162 |this, unblock: UnblockKind| {
163 assert_eq!(unblock, UnblockKind::Ready);
164 let self_ref = weak_self_ref.upgrade().unwrap();
167 anonsocket_write(self_ref, ptr, len, this, finish)
168 }
169 ),
170 );
171 }
172 } else {
173 let mut writebuf = writebuf.borrow_mut();
175 ecx.release_clock(|clock| {
177 writebuf.clock.join(clock);
178 });
179 let write_size = len.min(available_space);
181 let actual_write_size = ecx.write_to_host(&mut writebuf.buf, write_size, ptr)?.unwrap();
182 assert_eq!(actual_write_size, write_size);
183
184 drop(writebuf);
186
187 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_read_tid.borrow_mut());
189 for thread_id in waiting_threads {
191 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
192 }
193 ecx.check_and_update_readiness(peer_fd)?;
196
197 return finish.call(ecx, Ok(write_size));
198 }
199 interp_ok(())
200}
201
202fn anonsocket_read<'tcx>(
204 self_ref: FileDescriptionRef<AnonSocket>,
205 ptr: Pointer,
206 len: usize,
207 ecx: &mut MiriInterpCx<'tcx>,
208 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
209) -> InterpResult<'tcx> {
210 if len == 0 {
212 return finish.call(ecx, Ok(0));
213 }
214
215 let Some(readbuf) = &self_ref.readbuf else {
216 throw_unsup_format!("reading from the write end of a pipe")
219 };
220
221 if readbuf.borrow_mut().buf.is_empty() {
222 if self_ref.peer_fd().upgrade().is_none() {
223 return finish.call(ecx, Ok(0));
226 } else if self_ref.is_nonblock {
227 return finish.call(ecx, Err(ErrorKind::WouldBlock.into()));
233 } else {
234 self_ref.blocked_read_tid.borrow_mut().push(ecx.active_thread());
235 let weak_self_ref = FileDescriptionRef::downgrade(&self_ref);
238 ecx.block_thread(
239 BlockReason::UnnamedSocket,
240 None,
241 callback!(
242 @capture<'tcx> {
243 weak_self_ref: WeakFileDescriptionRef<AnonSocket>,
244 ptr: Pointer,
245 len: usize,
246 finish: DynMachineCallback<'tcx, Result<usize, IoError>>,
247 }
248 |this, unblock: UnblockKind| {
249 assert_eq!(unblock, UnblockKind::Ready);
250 let self_ref = weak_self_ref.upgrade().unwrap();
253 anonsocket_read(self_ref, ptr, len, this, finish)
254 }
255 ),
256 );
257 }
258 } else {
259 let mut readbuf = readbuf.borrow_mut();
261 ecx.acquire_clock(&readbuf.clock);
265
266 let read_size = ecx.read_from_host(&mut readbuf.buf, len, ptr)?.unwrap();
269
270 drop(readbuf);
272
273 if let Some(peer_fd) = self_ref.peer_fd().upgrade() {
281 let waiting_threads = std::mem::take(&mut *peer_fd.blocked_write_tid.borrow_mut());
283 for thread_id in waiting_threads {
285 ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
286 }
287 ecx.check_and_update_readiness(peer_fd)?;
289 };
290
291 return finish.call(ecx, Ok(read_size));
292 }
293 interp_ok(())
294}
295
296impl UnixFileDescription for AnonSocket {
297 fn get_epoll_ready_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadyEvents> {
298 let mut epoll_ready_events = EpollReadyEvents::new();
302
303 if let Some(readbuf) = &self.readbuf {
305 if !readbuf.borrow().buf.is_empty() {
306 epoll_ready_events.epollin = true;
307 }
308 } else {
309 epoll_ready_events.epollin = true;
311 }
312
313 if let Some(peer_fd) = self.peer_fd().upgrade() {
315 if let Some(writebuf) = &peer_fd.readbuf {
316 let data_size = writebuf.borrow().buf.len();
317 let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
318 if available_space != 0 {
319 epoll_ready_events.epollout = true;
320 }
321 } else {
322 epoll_ready_events.epollout = true;
324 }
325 } else {
326 epoll_ready_events.epollrdhup = true;
329 epoll_ready_events.epollhup = true;
330 epoll_ready_events.epollin = true;
334 epoll_ready_events.epollout = true;
335 if self.peer_lost_data.get() {
337 epoll_ready_events.epollerr = true;
338 }
339 }
340 interp_ok(epoll_ready_events)
341 }
342}
343
344impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
345pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
346 fn socketpair(
349 &mut self,
350 domain: &OpTy<'tcx>,
351 type_: &OpTy<'tcx>,
352 protocol: &OpTy<'tcx>,
353 sv: &OpTy<'tcx>,
354 ) -> InterpResult<'tcx, Scalar> {
355 let this = self.eval_context_mut();
356
357 let domain = this.read_scalar(domain)?.to_i32()?;
358 let mut flags = this.read_scalar(type_)?.to_i32()?;
359 let protocol = this.read_scalar(protocol)?.to_i32()?;
360 let sv = this.deref_pointer_as(sv, this.machine.layouts.i32)?;
362
363 let mut is_sock_nonblock = false;
364
365 if this.tcx.sess.target.os == "linux" {
368 let sock_nonblock = this.eval_libc_i32("SOCK_NONBLOCK");
370 let sock_cloexec = this.eval_libc_i32("SOCK_CLOEXEC");
371 if flags & sock_nonblock == sock_nonblock {
372 is_sock_nonblock = true;
373 flags &= !sock_nonblock;
374 }
375 if flags & sock_cloexec == sock_cloexec {
376 flags &= !sock_cloexec;
377 }
378 }
379
380 if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
384 throw_unsup_format!(
385 "socketpair: domain {:#x} is unsupported, only AF_UNIX \
386 and AF_LOCAL are allowed",
387 domain
388 );
389 } else if flags != this.eval_libc_i32("SOCK_STREAM") {
390 throw_unsup_format!(
391 "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
392 SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
393 flags
394 );
395 } else if protocol != 0 {
396 throw_unsup_format!(
397 "socketpair: socket protocol {protocol} is unsupported, \
398 only 0 is allowed",
399 );
400 }
401
402 let fds = &mut this.machine.fds;
404 let fd0 = fds.new_ref(AnonSocket {
405 readbuf: Some(RefCell::new(Buffer::new())),
406 peer_fd: OnceCell::new(),
407 peer_lost_data: Cell::new(false),
408 blocked_read_tid: RefCell::new(Vec::new()),
409 blocked_write_tid: RefCell::new(Vec::new()),
410 is_nonblock: is_sock_nonblock,
411 });
412 let fd1 = fds.new_ref(AnonSocket {
413 readbuf: Some(RefCell::new(Buffer::new())),
414 peer_fd: OnceCell::new(),
415 peer_lost_data: Cell::new(false),
416 blocked_read_tid: RefCell::new(Vec::new()),
417 blocked_write_tid: RefCell::new(Vec::new()),
418 is_nonblock: is_sock_nonblock,
419 });
420
421 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
423 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
424
425 let sv0 = fds.insert(fd0);
427 let sv1 = fds.insert(fd1);
428
429 let sv0 = Scalar::from_int(sv0, sv.layout.size);
431 let sv1 = Scalar::from_int(sv1, sv.layout.size);
432 this.write_scalar(sv0, &sv)?;
433 this.write_scalar(sv1, &sv.offset(sv.layout.size, sv.layout, this)?)?;
434
435 interp_ok(Scalar::from_i32(0))
436 }
437
438 fn pipe2(
439 &mut self,
440 pipefd: &OpTy<'tcx>,
441 flags: Option<&OpTy<'tcx>>,
442 ) -> InterpResult<'tcx, Scalar> {
443 let this = self.eval_context_mut();
444
445 let pipefd = this.deref_pointer_as(pipefd, this.machine.layouts.i32)?;
446 let mut flags = match flags {
447 Some(flags) => this.read_scalar(flags)?.to_i32()?,
448 None => 0,
449 };
450
451 let cloexec = this.eval_libc_i32("O_CLOEXEC");
452 let o_nonblock = this.eval_libc_i32("O_NONBLOCK");
453
454 let mut is_nonblock = false;
457 if flags & o_nonblock == o_nonblock {
458 is_nonblock = true;
459 flags &= !o_nonblock;
460 }
461 if flags & cloexec == cloexec {
463 flags &= !cloexec;
464 }
465 if flags != 0 {
466 throw_unsup_format!("unsupported flags in `pipe2`");
467 }
468
469 let fds = &mut this.machine.fds;
472 let fd0 = fds.new_ref(AnonSocket {
473 readbuf: Some(RefCell::new(Buffer::new())),
474 peer_fd: OnceCell::new(),
475 peer_lost_data: Cell::new(false),
476 blocked_read_tid: RefCell::new(Vec::new()),
477 blocked_write_tid: RefCell::new(Vec::new()),
478 is_nonblock,
479 });
480 let fd1 = fds.new_ref(AnonSocket {
481 readbuf: None,
482 peer_fd: OnceCell::new(),
483 peer_lost_data: Cell::new(false),
484 blocked_read_tid: RefCell::new(Vec::new()),
485 blocked_write_tid: RefCell::new(Vec::new()),
486 is_nonblock,
487 });
488
489 fd0.peer_fd.set(FileDescriptionRef::downgrade(&fd1)).unwrap();
491 fd1.peer_fd.set(FileDescriptionRef::downgrade(&fd0)).unwrap();
492
493 let pipefd0 = fds.insert(fd0);
495 let pipefd1 = fds.insert(fd1);
496
497 let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
499 let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
500 this.write_scalar(pipefd0, &pipefd)?;
501 this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
502
503 interp_ok(Scalar::from_i32(0))
504 }
505}