std/sys/sync/once/futex.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
use crate::cell::Cell;
use crate::sync as public;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};
// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 4 states:
/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
/// Some thread has previously attempted to initialize the Once, but it panicked,
/// so the Once is now poisoned. There are no other threads currently accessing
/// this Once.
const POISONED: u32 = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 3;
// An additional bit indicates whether there are waiting threads:
/// May only be set if the state is not COMPLETE.
const QUEUED: u32 = 4;
// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.
const STATE_MASK: u32 = 0b11;
pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
}
impl OnceState {
#[inline]
pub fn is_poisoned(&self) -> bool {
self.poisoned
}
#[inline]
pub fn poison(&self) {
self.set_state_to.set(POISONED);
}
}
struct CompletionGuard<'a> {
state_and_queued: &'a AtomicU32,
set_state_on_drop_to: u32,
}
impl<'a> Drop for CompletionGuard<'a> {
fn drop(&mut self) {
// Use release ordering to propagate changes to all threads checking
// up on the Once. `futex_wake_all` does its own synchronization, hence
// we do not need `AcqRel`.
if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
futex_wake_all(self.state_and_queued);
}
}
}
pub struct Once {
state_and_queued: AtomicU32,
}
impl Once {
#[inline]
pub const fn new() -> Once {
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
}
#[inline]
pub fn is_completed(&self) -> bool {
// Use acquire ordering to make all initialization changes visible to the
// current thread.
self.state_and_queued.load(Acquire) == COMPLETE
}
#[inline]
pub(crate) fn state(&mut self) -> ExclusiveState {
match *self.state_and_queued.get_mut() {
INCOMPLETE => ExclusiveState::Incomplete,
POISONED => ExclusiveState::Poisoned,
COMPLETE => ExclusiveState::Complete,
_ => unreachable!("invalid Once state"),
}
}
#[inline]
pub(crate) fn set_state(&mut self, new_state: ExclusiveState) {
*self.state_and_queued.get_mut() = match new_state {
ExclusiveState::Incomplete => INCOMPLETE,
ExclusiveState::Poisoned => POISONED,
ExclusiveState::Complete => COMPLETE,
};
}
#[cold]
#[track_caller]
pub fn wait(&self, ignore_poisoning: bool) {
let mut state_and_queued = self.state_and_queued.load(Acquire);
loop {
let state = state_and_queued & STATE_MASK;
let queued = state_and_queued & QUEUED != 0;
match state {
COMPLETE => return,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
_ => {
// Set the QUEUED bit if it has not already been set.
if !queued {
state_and_queued += QUEUED;
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state,
state_and_queued,
Relaxed,
Acquire,
) {
state_and_queued = new;
continue;
}
}
futex_wait(&self.state_and_queued, state_and_queued, None);
state_and_queued = self.state_and_queued.load(Acquire);
}
}
}
}
#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
let mut state_and_queued = self.state_and_queued.load(Acquire);
loop {
let state = state_and_queued & STATE_MASK;
let queued = state_and_queued & QUEUED != 0;
match state {
COMPLETE => return,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
INCOMPLETE | POISONED => {
// Try to register the current thread as the one running.
let next = RUNNING + if queued { QUEUED } else { 0 };
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state_and_queued,
next,
Acquire,
Acquire,
) {
state_and_queued = new;
continue;
}
// `waiter_queue` will manage other waiting threads, and
// wake them up on drop.
let mut waiter_queue = CompletionGuard {
state_and_queued: &self.state_and_queued,
set_state_on_drop_to: POISONED,
};
// Run the function, letting it know if we're poisoned or not.
let f_state = public::OnceState {
inner: OnceState {
poisoned: state == POISONED,
set_state_to: Cell::new(COMPLETE),
},
};
f(&f_state);
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
return;
}
_ => {
// All other values must be RUNNING.
assert!(state == RUNNING);
// Set the QUEUED bit if it is not already set.
if !queued {
state_and_queued += QUEUED;
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state,
state_and_queued,
Relaxed,
Acquire,
) {
state_and_queued = new;
continue;
}
}
futex_wait(&self.state_and_queued, state_and_queued, None);
state_and_queued = self.state_and_queued.load(Acquire);
}
}
}
}
}