std/sync/mpmc/
waker.rs
1use super::context::Context;
4use super::select::{Operation, Selected};
5use crate::ptr;
6use crate::sync::Mutex;
7use crate::sync::atomic::{AtomicBool, Ordering};
8
9pub(crate) struct Entry {
11 pub(crate) oper: Operation,
13
14 pub(crate) packet: *mut (),
16
17 pub(crate) cx: Context,
19}
20
21pub(crate) struct Waker {
26 selectors: Vec<Entry>,
28
29 observers: Vec<Entry>,
31}
32
33impl Waker {
34 #[inline]
36 pub(crate) fn new() -> Self {
37 Waker { selectors: Vec::new(), observers: Vec::new() }
38 }
39
40 #[inline]
42 pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
43 self.register_with_packet(oper, ptr::null_mut(), cx);
44 }
45
46 #[inline]
48 pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
49 self.selectors.push(Entry { oper, packet, cx: cx.clone() });
50 }
51
52 #[inline]
54 pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
55 if let Some((i, _)) =
56 self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
57 {
58 let entry = self.selectors.remove(i);
59 Some(entry)
60 } else {
61 None
62 }
63 }
64
65 #[inline]
67 pub(crate) fn try_select(&mut self) -> Option<Entry> {
68 if self.selectors.is_empty() {
69 None
70 } else {
71 let thread_id = current_thread_id();
72
73 self.selectors
74 .iter()
75 .position(|selector| {
76 selector.cx.thread_id() != thread_id
78 && selector .cx
80 .try_select(Selected::Operation(selector.oper))
81 .is_ok()
82 && {
83 selector.cx.store_packet(selector.packet);
85 selector.cx.unpark();
87 true
88 }
89 })
90 .map(|pos| self.selectors.remove(pos))
93 }
94 }
95
96 #[inline]
98 pub(crate) fn notify(&mut self) {
99 for entry in self.observers.drain(..) {
100 if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
101 entry.cx.unpark();
102 }
103 }
104 }
105
106 #[inline]
108 pub(crate) fn disconnect(&mut self) {
109 for entry in self.selectors.iter() {
110 if entry.cx.try_select(Selected::Disconnected).is_ok() {
111 entry.cx.unpark();
117 }
118 }
119
120 self.notify();
121 }
122}
123
124impl Drop for Waker {
125 #[inline]
126 fn drop(&mut self) {
127 debug_assert_eq!(self.selectors.len(), 0);
128 debug_assert_eq!(self.observers.len(), 0);
129 }
130}
131
132pub(crate) struct SyncWaker {
136 inner: Mutex<Waker>,
138
139 is_empty: AtomicBool,
141}
142
143impl SyncWaker {
144 #[inline]
146 pub(crate) fn new() -> Self {
147 SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
148 }
149
150 #[inline]
152 pub(crate) fn register(&self, oper: Operation, cx: &Context) {
153 let mut inner = self.inner.lock().unwrap();
154 inner.register(oper, cx);
155 self.is_empty
156 .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
157 }
158
159 #[inline]
161 pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
162 let mut inner = self.inner.lock().unwrap();
163 let entry = inner.unregister(oper);
164 self.is_empty
165 .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
166 entry
167 }
168
169 #[inline]
171 pub(crate) fn notify(&self) {
172 if !self.is_empty.load(Ordering::SeqCst) {
173 let mut inner = self.inner.lock().unwrap();
174 if !self.is_empty.load(Ordering::SeqCst) {
175 inner.try_select();
176 inner.notify();
177 self.is_empty.store(
178 inner.selectors.is_empty() && inner.observers.is_empty(),
179 Ordering::SeqCst,
180 );
181 }
182 }
183 }
184
185 #[inline]
187 pub(crate) fn disconnect(&self) {
188 let mut inner = self.inner.lock().unwrap();
189 inner.disconnect();
190 self.is_empty
191 .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
192 }
193}
194
195impl Drop for SyncWaker {
196 #[inline]
197 fn drop(&mut self) {
198 debug_assert!(self.is_empty.load(Ordering::SeqCst));
199 }
200}
201
202#[inline]
204pub fn current_thread_id() -> usize {
205 thread_local! { static DUMMY: u8 = const { 0 } }
208 DUMMY.with(|x| (x as *const u8).addr())
209}