1use std::marker::PhantomData;
2use std::ops::Deref;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::{Arc, Condvar, Mutex};
56use crate::job::JobRef;
7use crate::registry::{Registry, WorkerThread};
89/// We define various kinds of latches, which are all a primitive signaling
10/// mechanism. A latch starts as false. Eventually someone calls `set()` and
11/// it becomes true. You can test if it has been set by calling `probe()`.
12///
13/// Some kinds of latches, but not all, support a `wait()` operation
14/// that will wait until the latch is set, blocking efficiently. That
15/// is not part of the trait since it is not possibly to do with all
16/// latches.
17///
18/// The intention is that `set()` is called once, but `probe()` may be
19/// called any number of times. Once `probe()` returns true, the memory
20/// effects that occurred before `set()` become visible.
21///
22/// It'd probably be better to refactor the API into two paired types,
23/// but that's a bit of work, and this is not a public API.
24///
25/// ## Memory ordering
26///
27/// Latches need to guarantee two things:
28///
29/// - Once `probe()` returns true, all memory effects from the `set()`
30/// are visible (in other words, the set should synchronize-with
31/// the probe).
32/// - Once `set()` occurs, the next `probe()` *will* observe it. This
33/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
34/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
35pub(super) trait Latch {
36/// Set the latch, signalling others.
37 ///
38 /// # WARNING
39 ///
40 /// Setting a latch triggers other threads to wake up and (in some
41 /// cases) complete. This may, in turn, cause memory to be
42 /// deallocated and so forth. One must be very careful about this,
43 /// and it's typically better to read all the fields you will need
44 /// to access *before* a latch is set!
45 ///
46 /// This function operates on `*const Self` instead of `&self` to allow it
47 /// to become dangling during this call. The caller must ensure that the
48 /// pointer is valid upon entry, and not invalidated during the call by any
49 /// actions other than `set` itself.
50unsafe fn set(this: *const Self);
51}
5253pub(super) trait AsCoreLatch {
54fn as_core_latch(&self) -> &CoreLatch;
55}
5657/// Latch is not set, owning thread is awake
58const UNSET: usize = 0;
5960/// Latch is not set, owning thread is going to sleep on this latch
61/// (but has not yet fallen asleep).
62const SLEEPY: usize = 1;
6364/// Latch is not set, owning thread is asleep on this latch and
65/// must be awoken.
66const SLEEPING: usize = 2;
6768/// Latch is set.
69const SET: usize = 3;
7071/// Spin latches are the simplest, most efficient kind, but they do
72/// not support a `wait()` operation. They just have a boolean flag
73/// that becomes true when `set()` is called.
74#[derive(#[automatically_derived]
impl ::core::fmt::Debug for CoreLatch {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field1_finish(f, "CoreLatch",
"state", &&self.state)
}
}Debug)]
75pub(super) struct CoreLatch {
76 state: AtomicUsize,
77}
7879impl CoreLatch {
80#[inline]
81fn new() -> Self {
82Self { state: AtomicUsize::new(0) }
83 }
8485/// Invoked by owning thread as it prepares to sleep. Returns true
86 /// if the owning thread may proceed to fall asleep, false if the
87 /// latch was set in the meantime.
88#[inline]
89pub(super) fn get_sleepy(&self) -> bool {
90self.state.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed).is_ok()
91 }
9293/// Invoked by owning thread as it falls asleep sleep. Returns
94 /// true if the owning thread should block, or false if the latch
95 /// was set in the meantime.
96#[inline]
97pub(super) fn fall_asleep(&self) -> bool {
98self.state.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed).is_ok()
99 }
100101/// Invoked by owning thread as it falls asleep sleep. Returns
102 /// true if the owning thread should block, or false if the latch
103 /// was set in the meantime.
104#[inline]
105pub(super) fn wake_up(&self) {
106if !self.probe() {
107let _ =
108self.state.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
109 }
110 }
111112/// Set the latch. If this returns true, the owning thread was sleeping
113 /// and must be awoken.
114 ///
115 /// This is private because, typically, setting a latch involves
116 /// doing some wakeups; those are encapsulated in the surrounding
117 /// latch code.
118#[inline]
119unsafe fn set(this: *const Self) -> bool {
120let old_state = unsafe { (*this).state.swap(SET, Ordering::AcqRel) };
121old_state == SLEEPING122 }
123124/// Test if this latch has been set.
125#[inline]
126pub(super) fn probe(&self) -> bool {
127self.state.load(Ordering::Acquire) == SET128 }
129}
130131impl AsCoreLatchfor CoreLatch {
132#[inline]
133fn as_core_latch(&self) -> &CoreLatch {
134self135 }
136}
137138/// Spin latches are the simplest, most efficient kind, but they do
139/// not support a `wait()` operation. They just have a boolean flag
140/// that becomes true when `set()` is called.
141pub(super) struct SpinLatch<'r> {
142 core_latch: CoreLatch,
143 registry: &'r Arc<Registry>,
144 target_worker_index: usize,
145 cross: bool,
146}
147148impl<'r> SpinLatch<'r> {
149/// Creates a new spin latch that is owned by `thread`. This means
150 /// that `thread` is the only thread that should be blocking on
151 /// this latch -- it also means that when the latch is set, we
152 /// will wake `thread` if it is sleeping.
153#[inline]
154pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
155SpinLatch {
156 core_latch: CoreLatch::new(),
157 registry: thread.registry(),
158 target_worker_index: thread.index(),
159 cross: false,
160 }
161 }
162163/// Creates a new spin latch for cross-threadpool blocking. Notably, we
164 /// need to make sure the registry is kept alive after setting, so we can
165 /// safely call the notification.
166#[inline]
167pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
168SpinLatch { cross: true, ..SpinLatch::new(thread) }
169 }
170}
171172impl<'r> AsCoreLatchfor SpinLatch<'r> {
173#[inline]
174fn as_core_latch(&self) -> &CoreLatch {
175&self.core_latch
176 }
177}
178179impl<'r> Latchfor SpinLatch<'r> {
180#[inline]
181unsafe fn set(this: *const Self) {
182let cross_registry;
183184let registry: &Registry = if unsafe { (*this).cross } {
185// Ensure the registry stays alive while we notify it.
186 // Otherwise, it would be possible that we set the spin
187 // latch and the other thread sees it and exits, causing
188 // the registry to be deallocated, all before we get a
189 // chance to invoke `registry.notify_worker_latch_is_set`.
190cross_registry = Arc::clone(unsafe { (*this).registry });
191&cross_registry192 } else {
193// If this is not a "cross-registry" spin-latch, then the
194 // thread which is performing `set` is itself ensuring
195 // that the registry stays alive. However, that doesn't
196 // include this *particular* `Arc` handle if the waiting
197 // thread then exits, so we must completely dereference it.
198unsafe { (*this).registry }
199 };
200let target_worker_index = unsafe { (*this).target_worker_index };
201202// NOTE: Once we `set`, the target may proceed and invalidate `this`!
203if unsafe { CoreLatch::set(&(*this).core_latch) } {
204// Subtle: at this point, we can no longer read from
205 // `self`, because the thread owning this spin latch may
206 // have awoken and deallocated the latch. Therefore, we
207 // only use fields whose values we already read.
208registry.notify_worker_latch_is_set(target_worker_index);
209 }
210 }
211}
212213/// A Latch starts as false and eventually becomes true. You can block
214/// until it becomes true.
215#[derive(#[automatically_derived]
impl ::core::fmt::Debug for LockLatch {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field2_finish(f, "LockLatch",
"m", &self.m, "v", &&self.v)
}
}Debug)]
216pub(super) struct LockLatch {
217 m: Mutex<bool>,
218 v: Condvar,
219}
220221impl LockLatch {
222#[inline]
223pub(super) fn new() -> LockLatch {
224LockLatch { m: Mutex::new(false), v: Condvar::new() }
225 }
226227/// Block until latch is set, then resets this lock latch so it can be reused again.
228pub(super) fn wait_and_reset(&self) {
229let mut guard = self.m.lock().unwrap();
230while !*guard {
231 guard = self.v.wait(guard).unwrap();
232 }
233*guard = false;
234 }
235236/// Block until latch is set.
237pub(super) fn wait(&self) {
238let mut guard = self.m.lock().unwrap();
239while !*guard {
240 guard = self.v.wait(guard).unwrap();
241 }
242 }
243}
244245impl Latchfor LockLatch {
246#[inline]
247unsafe fn set(this: *const Self) {
248let mut guard = unsafe { (*this).m.lock().unwrap() };
249*guard = true;
250unsafe { (*this).v.notify_all() };
251 }
252}
253254/// Once latches are used to implement one-time blocking, primarily
255/// for the termination flag of the threads in the pool.
256///
257/// Note: like a `SpinLatch`, once-latches are always associated with
258/// some registry that is probing them, which must be tickled when
259/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
260/// reference to that registry. This is because in some cases the
261/// registry owns the once-latch, and that would create a cycle. So a
262/// `OnceLatch` must be given a reference to its owning registry when
263/// it is set. For this reason, it does not implement the `Latch`
264/// trait (but it doesn't have to, as it is not used in those generic
265/// contexts).
266#[derive(#[automatically_derived]
impl ::core::fmt::Debug for OnceLatch {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field1_finish(f, "OnceLatch",
"core_latch", &&self.core_latch)
}
}Debug)]
267pub(super) struct OnceLatch {
268 core_latch: CoreLatch,
269}
270271impl OnceLatch {
272#[inline]
273pub(super) fn new() -> OnceLatch {
274Self { core_latch: CoreLatch::new() }
275 }
276277/// Set the latch, then tickle the specific worker thread,
278 /// which should be the one that owns this latch.
279#[inline]
280pub(super) unsafe fn set_and_tickle_one(
281 this: *const Self,
282 registry: &Registry,
283 target_worker_index: usize,
284 ) {
285if unsafe { CoreLatch::set(&(*this).core_latch) } {
286registry.notify_worker_latch_is_set(target_worker_index);
287 }
288 }
289}
290291impl AsCoreLatchfor OnceLatch {
292#[inline]
293fn as_core_latch(&self) -> &CoreLatch {
294&self.core_latch
295 }
296}
297298/// Counting latches are used to implement scopes. They track a
299/// counter. Unlike other latches, calling `set()` does not
300/// necessarily make the latch be considered `set()`; instead, it just
301/// decrements the counter. The latch is only "set" (in the sense that
302/// `probe()` returns true) once the counter reaches zero.
303#[derive(#[automatically_derived]
impl ::core::fmt::Debug for CountLatch {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field2_finish(f, "CountLatch",
"counter", &self.counter, "kind", &&self.kind)
}
}Debug)]
304pub(super) struct CountLatch {
305 counter: AtomicUsize,
306 kind: CountLatchKind,
307}
308309enum CountLatchKind {
310/// A latch for scopes created on a rayon thread which will participate in work-
311 /// stealing while it waits for completion. This thread is not necessarily part
312 /// of the same registry as the scope itself!
313Stealing {
314 latch: CoreLatch,
315/// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
316 /// with registry B, when a job completes in a thread of registry B, we may
317 /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
318 /// That means we need a reference to registry A (since at that point we will
319 /// only have a reference to registry B), so we stash it here.
320registry: Arc<Registry>,
321/// The index of the worker to wake in `registry`
322worker_index: usize,
323 },
324325/// A latch for scopes created on a non-rayon thread which will block to wait.
326Blocking { latch: LockLatch },
327}
328329impl std::fmt::Debugfor CountLatchKind {
330fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331match self {
332 CountLatchKind::Stealing { latch, .. } => {
333f.debug_tuple("Stealing").field(latch).finish()
334 }
335 CountLatchKind::Blocking { latch, .. } => {
336f.debug_tuple("Blocking").field(latch).finish()
337 }
338 }
339 }
340}
341342impl CountLatch {
343pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
344Self::with_count(1, owner)
345 }
346347pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
348Self {
349 counter: AtomicUsize::new(count),
350 kind: match owner {
351Some(owner) => CountLatchKind::Stealing {
352 latch: CoreLatch::new(),
353 registry: Arc::clone(owner.registry()),
354 worker_index: owner.index(),
355 },
356None => CountLatchKind::Blocking { latch: LockLatch::new() },
357 },
358 }
359 }
360361#[inline]
362pub(super) fn increment(&self) {
363let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
364if true {
if !(old_counter != 0) {
::core::panicking::panic("assertion failed: old_counter != 0")
};
};debug_assert!(old_counter != 0);
365 }
366367pub(super) fn wait(
368&self,
369 owner: Option<&WorkerThread>,
370 all_jobs_started: impl FnMut() -> bool,
371 is_job: impl FnMut(&JobRef) -> bool,
372 ) {
373match &self.kind {
374 CountLatchKind::Stealing { latch, registry, worker_index } => unsafe {
375let owner = owner.expect("owner thread");
376if true {
match (®istry.id(), &owner.registry().id()) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val, ::core::option::Option::None);
}
}
};
};debug_assert_eq!(registry.id(), owner.registry().id());
377if true {
match (&*worker_index, &owner.index()) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val, ::core::option::Option::None);
}
}
};
};debug_assert_eq!(*worker_index, owner.index());
378owner.wait_for_jobs::<_, true>(latch, all_jobs_started, is_job, |job| {
379owner.execute(job);
380 });
381 },
382 CountLatchKind::Blocking { latch } => latch.wait(),
383 }
384 }
385}
386387impl Latchfor CountLatch {
388#[inline]
389unsafe fn set(this: *const Self) {
390if unsafe { (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 } {
391// SAFETY: Once we call `set` on the internal `latch`,
392 // the target may proceed and invalidate `this`!
393match unsafe { &(*this).kind } {
394 CountLatchKind::Stealing { latch, registry, worker_index } => {
395let registry = Arc::clone(registry);
396let worker_index = *worker_index;
397// SAFETY: We don't use any references from `this` after this call.
398if unsafe { CoreLatch::set(latch) } {
399// We **must not** access any part of `this` anymore, which
400 // is why we read and shadowed these fields beforehand.
401registry.notify_worker_latch_is_set(worker_index);
402 }
403 }
404 CountLatchKind::Blocking { latch } => unsafe { LockLatch::set(latch) },
405 }
406 }
407 }
408}
409410/// `&L` without any implication of `dereferenceable` for `Latch::set`
411pub(super) struct LatchRef<'a, L> {
412 inner: *const L,
413 marker: PhantomData<&'a L>,
414}
415416impl<L> LatchRef<'_, L> {
417pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
418LatchRef { inner, marker: PhantomData }
419 }
420}
421422unsafe impl<L: Sync> Syncfor LatchRef<'_, L> {}
423424impl<L> Dereffor LatchRef<'_, L> {
425type Target = L;
426427fn deref(&self) -> &L {
428// SAFETY: if we have &self, the inner latch is still alive
429unsafe { &*self.inner }
430 }
431}
432433impl<L: Latch> Latchfor LatchRef<'_, L> {
434#[inline]
435unsafe fn set(this: *const Self) {
436unsafe { L::set((*this).inner) };
437 }
438}