std/sync/mpmc/
waker.rs

1//! Waking mechanism for threads blocked on channel operations.
2
3use super::context::Context;
4use super::select::{Operation, Selected};
5use crate::ptr;
6use crate::sync::Mutex;
7use crate::sync::atomic::{AtomicBool, Ordering};
8
9/// Represents a thread blocked on a specific channel operation.
10pub(crate) struct Entry {
11    /// The operation.
12    pub(crate) oper: Operation,
13
14    /// Optional packet.
15    pub(crate) packet: *mut (),
16
17    /// Context associated with the thread owning this operation.
18    pub(crate) cx: Context,
19}
20
21/// A queue of threads blocked on channel operations.
22///
23/// This data structure is used by threads to register blocking operations and get woken up once
24/// an operation becomes ready.
25pub(crate) struct Waker {
26    /// A list of select operations.
27    selectors: Vec<Entry>,
28
29    /// A list of operations waiting to be ready.
30    observers: Vec<Entry>,
31}
32
33impl Waker {
34    /// Creates a new `Waker`.
35    #[inline]
36    pub(crate) fn new() -> Self {
37        Waker { selectors: Vec::new(), observers: Vec::new() }
38    }
39
40    /// Registers a select operation.
41    #[inline]
42    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
43        self.register_with_packet(oper, ptr::null_mut(), cx);
44    }
45
46    /// Registers a select operation and a packet.
47    #[inline]
48    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
49        self.selectors.push(Entry { oper, packet, cx: cx.clone() });
50    }
51
52    /// Unregisters a select operation.
53    #[inline]
54    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
55        if let Some((i, _)) =
56            self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
57        {
58            let entry = self.selectors.remove(i);
59            Some(entry)
60        } else {
61            None
62        }
63    }
64
65    /// Attempts to find another thread's entry, select the operation, and wake it up.
66    #[inline]
67    pub(crate) fn try_select(&mut self) -> Option<Entry> {
68        if self.selectors.is_empty() {
69            None
70        } else {
71            let thread_id = current_thread_id();
72
73            self.selectors
74                .iter()
75                .position(|selector| {
76                    // Does the entry belong to a different thread?
77                    selector.cx.thread_id() != thread_id
78                        && selector // Try selecting this operation.
79                            .cx
80                            .try_select(Selected::Operation(selector.oper))
81                            .is_ok()
82                        && {
83                            // Provide the packet.
84                            selector.cx.store_packet(selector.packet);
85                            // Wake the thread up.
86                            selector.cx.unpark();
87                            true
88                        }
89                })
90                // Remove the entry from the queue to keep it clean and improve
91                // performance.
92                .map(|pos| self.selectors.remove(pos))
93        }
94    }
95
96    /// Notifies all operations waiting to be ready.
97    #[inline]
98    pub(crate) fn notify(&mut self) {
99        for entry in self.observers.drain(..) {
100            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
101                entry.cx.unpark();
102            }
103        }
104    }
105
106    /// Notifies all registered operations that the channel is disconnected.
107    #[inline]
108    pub(crate) fn disconnect(&mut self) {
109        for entry in self.selectors.iter() {
110            if entry.cx.try_select(Selected::Disconnected).is_ok() {
111                // Wake the thread up.
112                //
113                // Here we don't remove the entry from the queue. Registered threads must
114                // unregister from the waker by themselves. They might also want to recover the
115                // packet value and destroy it, if necessary.
116                entry.cx.unpark();
117            }
118        }
119
120        self.notify();
121    }
122}
123
124impl Drop for Waker {
125    #[inline]
126    fn drop(&mut self) {
127        debug_assert_eq!(self.selectors.len(), 0);
128        debug_assert_eq!(self.observers.len(), 0);
129    }
130}
131
132/// A waker that can be shared among threads without locking.
133///
134/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
135pub(crate) struct SyncWaker {
136    /// The inner `Waker`.
137    inner: Mutex<Waker>,
138
139    /// `true` if the waker is empty.
140    is_empty: AtomicBool,
141}
142
143impl SyncWaker {
144    /// Creates a new `SyncWaker`.
145    #[inline]
146    pub(crate) fn new() -> Self {
147        SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
148    }
149
150    /// Registers the current thread with an operation.
151    #[inline]
152    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
153        let mut inner = self.inner.lock().unwrap();
154        inner.register(oper, cx);
155        self.is_empty
156            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
157    }
158
159    /// Unregisters an operation previously registered by the current thread.
160    #[inline]
161    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
162        let mut inner = self.inner.lock().unwrap();
163        let entry = inner.unregister(oper);
164        self.is_empty
165            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
166        entry
167    }
168
169    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
170    #[inline]
171    pub(crate) fn notify(&self) {
172        if !self.is_empty.load(Ordering::SeqCst) {
173            let mut inner = self.inner.lock().unwrap();
174            if !self.is_empty.load(Ordering::SeqCst) {
175                inner.try_select();
176                inner.notify();
177                self.is_empty.store(
178                    inner.selectors.is_empty() && inner.observers.is_empty(),
179                    Ordering::SeqCst,
180                );
181            }
182        }
183    }
184
185    /// Notifies all threads that the channel is disconnected.
186    #[inline]
187    pub(crate) fn disconnect(&self) {
188        let mut inner = self.inner.lock().unwrap();
189        inner.disconnect();
190        self.is_empty
191            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
192    }
193}
194
195impl Drop for SyncWaker {
196    #[inline]
197    fn drop(&mut self) {
198        debug_assert!(self.is_empty.load(Ordering::SeqCst));
199    }
200}
201
202/// Returns a unique id for the current thread.
203#[inline]
204pub fn current_thread_id() -> usize {
205    // `u8` is not drop so this variable will be available during thread destruction,
206    // whereas `thread::current()` would not be
207    thread_local! { static DUMMY: u8 = const { 0 } }
208    DUMMY.with(|x| (x as *const u8).addr())
209}