std/sys/sync/once/
futex.rs

1use crate::cell::Cell;
2use crate::sync as public;
3use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use crate::sync::poison::once::ExclusiveState;
5use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all};
6
7// On some platforms, the OS is very nice and handles the waiter queue for us.
8// This means we only need one atomic value with 4 states:
9
10/// No initialization has run yet, and no thread is currently using the Once.
11const INCOMPLETE: Primitive = 0;
12/// Some thread has previously attempted to initialize the Once, but it panicked,
13/// so the Once is now poisoned. There are no other threads currently accessing
14/// this Once.
15const POISONED: Primitive = 1;
16/// Some thread is currently attempting to run initialization. It may succeed,
17/// so all future threads need to wait for it to finish.
18const RUNNING: Primitive = 2;
19/// Initialization has completed and all future calls should finish immediately.
20const COMPLETE: Primitive = 3;
21
22// An additional bit indicates whether there are waiting threads:
23
24/// May only be set if the state is not COMPLETE.
25const QUEUED: Primitive = 4;
26
27// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
28// variable. When the running thread finishes, it will wake all waiting threads using
29// `futex_wake_all`.
30
31const STATE_MASK: Primitive = 0b11;
32
33pub struct OnceState {
34    poisoned: bool,
35    set_state_to: Cell<Primitive>,
36}
37
38impl OnceState {
39    #[inline]
40    pub fn is_poisoned(&self) -> bool {
41        self.poisoned
42    }
43
44    #[inline]
45    pub fn poison(&self) {
46        self.set_state_to.set(POISONED);
47    }
48}
49
50struct CompletionGuard<'a> {
51    state_and_queued: &'a Futex,
52    set_state_on_drop_to: Primitive,
53}
54
55impl<'a> Drop for CompletionGuard<'a> {
56    fn drop(&mut self) {
57        // Use release ordering to propagate changes to all threads checking
58        // up on the Once. `futex_wake_all` does its own synchronization, hence
59        // we do not need `AcqRel`.
60        if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
61            futex_wake_all(self.state_and_queued);
62        }
63    }
64}
65
66pub struct Once {
67    state_and_queued: Futex,
68}
69
70impl Once {
71    #[inline]
72    pub const fn new() -> Once {
73        Once { state_and_queued: Futex::new(INCOMPLETE) }
74    }
75
76    #[inline]
77    pub fn is_completed(&self) -> bool {
78        // Use acquire ordering to make all initialization changes visible to the
79        // current thread.
80        self.state_and_queued.load(Acquire) == COMPLETE
81    }
82
83    #[inline]
84    pub(crate) fn state(&mut self) -> ExclusiveState {
85        match *self.state_and_queued.get_mut() {
86            INCOMPLETE => ExclusiveState::Incomplete,
87            POISONED => ExclusiveState::Poisoned,
88            COMPLETE => ExclusiveState::Complete,
89            _ => unreachable!("invalid Once state"),
90        }
91    }
92
93    #[inline]
94    pub(crate) fn set_state(&mut self, new_state: ExclusiveState) {
95        *self.state_and_queued.get_mut() = match new_state {
96            ExclusiveState::Incomplete => INCOMPLETE,
97            ExclusiveState::Poisoned => POISONED,
98            ExclusiveState::Complete => COMPLETE,
99        };
100    }
101
102    #[cold]
103    #[track_caller]
104    pub fn wait(&self, ignore_poisoning: bool) {
105        let mut state_and_queued = self.state_and_queued.load(Acquire);
106        loop {
107            let state = state_and_queued & STATE_MASK;
108            let queued = state_and_queued & QUEUED != 0;
109            match state {
110                COMPLETE => return,
111                POISONED if !ignore_poisoning => {
112                    // Panic to propagate the poison.
113                    panic!("Once instance has previously been poisoned");
114                }
115                _ => {
116                    // Set the QUEUED bit if it has not already been set.
117                    if !queued {
118                        state_and_queued += QUEUED;
119                        if let Err(new) = self.state_and_queued.compare_exchange_weak(
120                            state,
121                            state_and_queued,
122                            Relaxed,
123                            Acquire,
124                        ) {
125                            state_and_queued = new;
126                            continue;
127                        }
128                    }
129
130                    futex_wait(&self.state_and_queued, state_and_queued, None);
131                    state_and_queued = self.state_and_queued.load(Acquire);
132                }
133            }
134        }
135    }
136
137    #[cold]
138    #[track_caller]
139    pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
140        let mut state_and_queued = self.state_and_queued.load(Acquire);
141        loop {
142            let state = state_and_queued & STATE_MASK;
143            let queued = state_and_queued & QUEUED != 0;
144            match state {
145                COMPLETE => return,
146                POISONED if !ignore_poisoning => {
147                    // Panic to propagate the poison.
148                    panic!("Once instance has previously been poisoned");
149                }
150                INCOMPLETE | POISONED => {
151                    // Try to register the current thread as the one running.
152                    let next = RUNNING + if queued { QUEUED } else { 0 };
153                    if let Err(new) = self.state_and_queued.compare_exchange_weak(
154                        state_and_queued,
155                        next,
156                        Acquire,
157                        Acquire,
158                    ) {
159                        state_and_queued = new;
160                        continue;
161                    }
162
163                    // `waiter_queue` will manage other waiting threads, and
164                    // wake them up on drop.
165                    let mut waiter_queue = CompletionGuard {
166                        state_and_queued: &self.state_and_queued,
167                        set_state_on_drop_to: POISONED,
168                    };
169                    // Run the function, letting it know if we're poisoned or not.
170                    let f_state = public::OnceState {
171                        inner: OnceState {
172                            poisoned: state == POISONED,
173                            set_state_to: Cell::new(COMPLETE),
174                        },
175                    };
176                    f(&f_state);
177                    waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
178                    return;
179                }
180                _ => {
181                    // All other values must be RUNNING.
182                    assert!(state == RUNNING);
183
184                    // Set the QUEUED bit if it is not already set.
185                    if !queued {
186                        state_and_queued += QUEUED;
187                        if let Err(new) = self.state_and_queued.compare_exchange_weak(
188                            state,
189                            state_and_queued,
190                            Relaxed,
191                            Acquire,
192                        ) {
193                            state_and_queued = new;
194                            continue;
195                        }
196                    }
197
198                    futex_wait(&self.state_and_queued, state_and_queued, None);
199                    state_and_queued = self.state_and_queued.load(Acquire);
200                }
201            }
202        }
203    }
204}