std/sync/mpmc/
zero.rs

1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use super::context::Context;
6use super::error::*;
7use super::select::{Operation, Selected, Token};
8use super::utils::Backoff;
9use super::waker::Waker;
10use crate::cell::UnsafeCell;
11use crate::marker::PhantomData;
12use crate::sync::Mutex;
13use crate::sync::atomic::{AtomicBool, Ordering};
14use crate::time::Instant;
15use crate::{fmt, ptr};
16
17/// A pointer to a packet.
18pub(crate) struct ZeroToken(*mut ());
19
20impl Default for ZeroToken {
21    fn default() -> Self {
22        Self(ptr::null_mut())
23    }
24}
25
26impl fmt::Debug for ZeroToken {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        fmt::Debug::fmt(&(self.0 as usize), f)
29    }
30}
31
32/// A slot for passing one message from a sender to a receiver.
33struct Packet<T> {
34    /// Equals `true` if the packet is allocated on the stack.
35    on_stack: bool,
36
37    /// Equals `true` once the packet is ready for reading or writing.
38    ready: AtomicBool,
39
40    /// The message.
41    msg: UnsafeCell<Option<T>>,
42}
43
44impl<T> Packet<T> {
45    /// Creates an empty packet on the stack.
46    fn empty_on_stack() -> Packet<T> {
47        Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
48    }
49
50    /// Creates a packet on the stack, containing a message.
51    fn message_on_stack(msg: T) -> Packet<T> {
52        Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
53    }
54
55    /// Waits until the packet becomes ready for reading or writing.
56    fn wait_ready(&self) {
57        let backoff = Backoff::new();
58        while !self.ready.load(Ordering::Acquire) {
59            backoff.spin_heavy();
60        }
61    }
62}
63
64/// Inner representation of a zero-capacity channel.
65struct Inner {
66    /// Senders waiting to pair up with a receive operation.
67    senders: Waker,
68
69    /// Receivers waiting to pair up with a send operation.
70    receivers: Waker,
71
72    /// Equals `true` when the channel is disconnected.
73    is_disconnected: bool,
74}
75
76/// Zero-capacity channel.
77pub(crate) struct Channel<T> {
78    /// Inner representation of the channel.
79    inner: Mutex<Inner>,
80
81    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
82    _marker: PhantomData<T>,
83}
84
85impl<T> Channel<T> {
86    /// Constructs a new zero-capacity channel.
87    pub(crate) fn new() -> Self {
88        Channel {
89            inner: Mutex::new(Inner {
90                senders: Waker::new(),
91                receivers: Waker::new(),
92                is_disconnected: false,
93            }),
94            _marker: PhantomData,
95        }
96    }
97
98    /// Writes a message into the packet.
99    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
100        // If there is no packet, the channel is disconnected.
101        if token.zero.0.is_null() {
102            return Err(msg);
103        }
104
105        unsafe {
106            let packet = &*(token.zero.0 as *const Packet<T>);
107            packet.msg.get().write(Some(msg));
108            packet.ready.store(true, Ordering::Release);
109        }
110        Ok(())
111    }
112
113    /// Reads a message from the packet.
114    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
115        // If there is no packet, the channel is disconnected.
116        if token.zero.0.is_null() {
117            return Err(());
118        }
119
120        let packet = unsafe { &*(token.zero.0 as *const Packet<T>) };
121
122        if packet.on_stack {
123            // The message has been in the packet from the beginning, so there is no need to wait
124            // for it. However, after reading the message, we need to set `ready` to `true` in
125            // order to signal that the packet can be destroyed.
126            let msg = unsafe { packet.msg.get().replace(None) }.unwrap();
127            packet.ready.store(true, Ordering::Release);
128            Ok(msg)
129        } else {
130            // Wait until the message becomes available, then read it and destroy the
131            // heap-allocated packet.
132            packet.wait_ready();
133            unsafe {
134                let msg = packet.msg.get().replace(None).unwrap();
135                drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
136                Ok(msg)
137            }
138        }
139    }
140
141    /// Attempts to send a message into the channel.
142    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
143        let token = &mut Token::default();
144        let mut inner = self.inner.lock().unwrap();
145
146        // If there's a waiting receiver, pair up with it.
147        if let Some(operation) = inner.receivers.try_select() {
148            token.zero.0 = operation.packet;
149            drop(inner);
150            unsafe {
151                self.write(token, msg).ok().unwrap();
152            }
153            Ok(())
154        } else if inner.is_disconnected {
155            Err(TrySendError::Disconnected(msg))
156        } else {
157            Err(TrySendError::Full(msg))
158        }
159    }
160
161    /// Sends a message into the channel.
162    pub(crate) fn send(
163        &self,
164        msg: T,
165        deadline: Option<Instant>,
166    ) -> Result<(), SendTimeoutError<T>> {
167        let token = &mut Token::default();
168        let mut inner = self.inner.lock().unwrap();
169
170        // If there's a waiting receiver, pair up with it.
171        if let Some(operation) = inner.receivers.try_select() {
172            token.zero.0 = operation.packet;
173            drop(inner);
174            unsafe {
175                self.write(token, msg).ok().unwrap();
176            }
177            return Ok(());
178        }
179
180        if inner.is_disconnected {
181            return Err(SendTimeoutError::Disconnected(msg));
182        }
183
184        Context::with(|cx| {
185            // Prepare for blocking until a receiver wakes us up.
186            let oper = Operation::hook(token);
187            let mut packet = Packet::<T>::message_on_stack(msg);
188            inner.senders.register_with_packet(oper, (&raw mut packet) as *mut (), cx);
189            inner.receivers.notify();
190            drop(inner);
191
192            // Block the current thread.
193            // SAFETY: the context belongs to the current thread.
194            let sel = unsafe { cx.wait_until(deadline) };
195
196            match sel {
197                Selected::Waiting => unreachable!(),
198                Selected::Aborted => {
199                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
200                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
201                    Err(SendTimeoutError::Timeout(msg))
202                }
203                Selected::Disconnected => {
204                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
205                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
206                    Err(SendTimeoutError::Disconnected(msg))
207                }
208                Selected::Operation(_) => {
209                    // Wait until the message is read, then drop the packet.
210                    packet.wait_ready();
211                    Ok(())
212                }
213            }
214        })
215    }
216
217    /// Attempts to receive a message without blocking.
218    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
219        let token = &mut Token::default();
220        let mut inner = self.inner.lock().unwrap();
221
222        // If there's a waiting sender, pair up with it.
223        if let Some(operation) = inner.senders.try_select() {
224            token.zero.0 = operation.packet;
225            drop(inner);
226            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
227        } else if inner.is_disconnected {
228            Err(TryRecvError::Disconnected)
229        } else {
230            Err(TryRecvError::Empty)
231        }
232    }
233
234    /// Receives a message from the channel.
235    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
236        let token = &mut Token::default();
237        let mut inner = self.inner.lock().unwrap();
238
239        // If there's a waiting sender, pair up with it.
240        if let Some(operation) = inner.senders.try_select() {
241            token.zero.0 = operation.packet;
242            drop(inner);
243            unsafe {
244                return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
245            }
246        }
247
248        if inner.is_disconnected {
249            return Err(RecvTimeoutError::Disconnected);
250        }
251
252        Context::with(|cx| {
253            // Prepare for blocking until a sender wakes us up.
254            let oper = Operation::hook(token);
255            let mut packet = Packet::<T>::empty_on_stack();
256            inner.receivers.register_with_packet(oper, (&raw mut packet) as *mut (), cx);
257            inner.senders.notify();
258            drop(inner);
259
260            // Block the current thread.
261            // SAFETY: the context belongs to the current thread.
262            let sel = unsafe { cx.wait_until(deadline) };
263
264            match sel {
265                Selected::Waiting => unreachable!(),
266                Selected::Aborted => {
267                    self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
268                    Err(RecvTimeoutError::Timeout)
269                }
270                Selected::Disconnected => {
271                    self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
272                    Err(RecvTimeoutError::Disconnected)
273                }
274                Selected::Operation(_) => {
275                    // Wait until the message is provided, then read it.
276                    packet.wait_ready();
277                    unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
278                }
279            }
280        })
281    }
282
283    /// Disconnects the channel and wakes up all blocked senders and receivers.
284    ///
285    /// Returns `true` if this call disconnected the channel.
286    pub(crate) fn disconnect(&self) -> bool {
287        let mut inner = self.inner.lock().unwrap();
288
289        if !inner.is_disconnected {
290            inner.is_disconnected = true;
291            inner.senders.disconnect();
292            inner.receivers.disconnect();
293            true
294        } else {
295            false
296        }
297    }
298
299    /// Returns the current number of messages inside the channel.
300    pub(crate) fn len(&self) -> usize {
301        0
302    }
303
304    /// Returns the capacity of the channel.
305    #[allow(clippy::unnecessary_wraps)] // This is intentional.
306    pub(crate) fn capacity(&self) -> Option<usize> {
307        Some(0)
308    }
309
310    /// Returns `true` if the channel is empty.
311    pub(crate) fn is_empty(&self) -> bool {
312        true
313    }
314
315    /// Returns `true` if the channel is full.
316    pub(crate) fn is_full(&self) -> bool {
317        true
318    }
319}