std/sync/mpmc/
context.rs
1use super::select::Selected;
4use super::waker::current_thread_id;
5use crate::cell::Cell;
6use crate::ptr;
7use crate::sync::Arc;
8use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use crate::thread::{self, Thread};
10use crate::time::Instant;
11
12#[derive(Debug, Clone)]
14pub struct Context {
15 inner: Arc<Inner>,
16}
17
18#[derive(Debug)]
20struct Inner {
21 select: AtomicUsize,
23
24 packet: AtomicPtr<()>,
26
27 thread: Thread,
29
30 thread_id: usize,
32}
33
34impl Context {
35 #[inline]
37 pub fn with<F, R>(f: F) -> R
38 where
39 F: FnOnce(&Context) -> R,
40 {
41 thread_local! {
42 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
44 }
45
46 let mut f = Some(f);
47 let mut f = |cx: &Context| -> R {
48 let f = f.take().unwrap();
49 f(cx)
50 };
51
52 CONTEXT
53 .try_with(|cell| match cell.take() {
54 None => f(&Context::new()),
55 Some(cx) => {
56 cx.reset();
57 let res = f(&cx);
58 cell.set(Some(cx));
59 res
60 }
61 })
62 .unwrap_or_else(|_| f(&Context::new()))
63 }
64
65 #[cold]
67 fn new() -> Context {
68 Context {
69 inner: Arc::new(Inner {
70 select: AtomicUsize::new(Selected::Waiting.into()),
71 packet: AtomicPtr::new(ptr::null_mut()),
72 thread: thread::current_or_unnamed(),
73 thread_id: current_thread_id(),
74 }),
75 }
76 }
77
78 #[inline]
80 fn reset(&self) {
81 self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
82 self.inner.packet.store(ptr::null_mut(), Ordering::Release);
83 }
84
85 #[inline]
89 pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
90 self.inner
91 .select
92 .compare_exchange(
93 Selected::Waiting.into(),
94 select.into(),
95 Ordering::AcqRel,
96 Ordering::Acquire,
97 )
98 .map(|_| ())
99 .map_err(|e| e.into())
100 }
101
102 #[inline]
106 pub fn store_packet(&self, packet: *mut ()) {
107 if !packet.is_null() {
108 self.inner.packet.store(packet, Ordering::Release);
109 }
110 }
111
112 #[inline]
119 pub unsafe fn wait_until(&self, deadline: Option<Instant>) -> Selected {
120 loop {
121 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
123 if sel != Selected::Waiting {
124 return sel;
125 }
126
127 if let Some(end) = deadline {
129 let now = Instant::now();
130
131 if now < end {
132 unsafe { self.inner.thread.park_timeout(end - now) };
134 } else {
135 return match self.try_select(Selected::Aborted) {
137 Ok(()) => Selected::Aborted,
138 Err(s) => s,
139 };
140 }
141 } else {
142 unsafe { self.inner.thread.park() };
144 }
145 }
146 }
147
148 #[inline]
150 pub fn unpark(&self) {
151 self.inner.thread.unpark();
152 }
153
154 #[inline]
156 pub fn thread_id(&self) -> usize {
157 self.inner.thread_id
158 }
159}