1use std::sync::atomic::{AtomicUsize, Ordering};
23pub(super) struct AtomicCounters {
4/// Packs together a number of counters. The counters are ordered as
5 /// follows, from least to most significant bits (here, we assuming
6 /// that [`THREADS_BITS`] is equal to 10):
7 ///
8 /// * Bits 0..10: Stores the number of **sleeping threads**
9 /// * Bits 10..20: Stores the number of **inactive threads**
10 /// * Bits 20..: Stores the **job event counter** (JEC)
11 ///
12 /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
13 /// that the total number of bits (and hence the number of bits used for the
14 /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
15value: AtomicUsize,
16}
1718#[derive(#[automatically_derived]
impl ::core::marker::Copy for Counters { }Copy, #[automatically_derived]
impl ::core::clone::Clone for Counters {
#[inline]
fn clone(&self) -> Counters {
let _: ::core::clone::AssertParamIsClone<usize>;
*self
}
}Clone)]
19pub(super) struct Counters {
20 word: usize,
21}
2223/// A value read from the **Jobs Event Counter**.
24/// See the [`README.md`](README.md) for more
25/// coverage of how the jobs event counter works.
26#[derive(#[automatically_derived]
impl ::core::marker::Copy for JobsEventCounter { }Copy, #[automatically_derived]
impl ::core::clone::Clone for JobsEventCounter {
#[inline]
fn clone(&self) -> JobsEventCounter {
let _: ::core::clone::AssertParamIsClone<usize>;
*self
}
}Clone, #[automatically_derived]
impl ::core::fmt::Debug for JobsEventCounter {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_tuple_field1_finish(f,
"JobsEventCounter", &&self.0)
}
}Debug, #[automatically_derived]
impl ::core::cmp::PartialEq for JobsEventCounter {
#[inline]
fn eq(&self, other: &JobsEventCounter) -> bool { self.0 == other.0 }
}PartialEq, #[automatically_derived]
impl ::core::cmp::PartialOrd for JobsEventCounter {
#[inline]
fn partial_cmp(&self, other: &JobsEventCounter)
-> ::core::option::Option<::core::cmp::Ordering> {
::core::cmp::PartialOrd::partial_cmp(&self.0, &other.0)
}
}PartialOrd)]
27pub(super) struct JobsEventCounter(usize);
2829impl JobsEventCounter {
30pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(usize::MAX);
3132#[inline]
33pub(super) fn as_usize(self) -> usize {
34self.0
35}
3637/// The JEC "is sleepy" if the last thread to increment it was in the
38 /// process of becoming sleepy. This is indicated by its value being *even*.
39 /// When new jobs are posted, they check if the JEC is sleepy, and if so
40 /// they incremented it.
41#[inline]
42pub(super) fn is_sleepy(self) -> bool {
43 (self.as_usize() & 1) == 0
44}
4546/// The JEC "is active" if the last thread to increment it was posting new
47 /// work. This is indicated by its value being *odd*. When threads get
48 /// sleepy, they will check if the JEC is active, and increment it.
49#[inline]
50pub(super) fn is_active(self) -> bool {
51 !self.is_sleepy()
52 }
53}
5455/// Number of bits used for the thread counters.
56#[cfg(target_pointer_width = "64")]
57const THREADS_BITS: usize = 16;
5859#[cfg(target_pointer_width = "32")]
60const THREADS_BITS: usize = 8;
6162/// Bits to shift to select the sleeping threads
63/// (used with `select_bits`).
64#[allow(clippy::erasing_op)]
65const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
6667/// Bits to shift to select the inactive threads
68/// (used with `select_bits`).
69#[allow(clippy::identity_op)]
70const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
7172/// Bits to shift to select the JEC
73/// (use JOBS_BITS).
74const JEC_SHIFT: usize = 2 * THREADS_BITS;
7576/// Max value for the thread counters.
77pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
7879/// Constant that can be added to add one sleeping thread.
80const ONE_SLEEPING: usize = 1;
8182/// Constant that can be added to add one inactive thread.
83/// An inactive thread is either idle, sleepy, or sleeping.
84const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
8586/// Constant that can be added to add one to the JEC.
87const ONE_JEC: usize = 1 << JEC_SHIFT;
8889impl AtomicCounters {
90#[inline]
91pub(super) fn new() -> AtomicCounters {
92AtomicCounters { value: AtomicUsize::new(0) }
93 }
9495/// Load and return the current value of the various counters.
96 /// This value can then be given to other method which will
97 /// attempt to update the counters via compare-and-swap.
98#[inline]
99pub(super) fn load(&self, ordering: Ordering) -> Counters {
100Counters::new(self.value.load(ordering))
101 }
102103#[inline]
104fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
105self.value
106 .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
107 .is_ok()
108 }
109110/// Adds an inactive thread. This cannot fail.
111 ///
112 /// This should be invoked when a thread enters its idle loop looking
113 /// for work. It is decremented when work is found. Note that it is
114 /// not decremented if the thread transitions from idle to sleepy or sleeping;
115 /// so the number of inactive threads is always greater-than-or-equal
116 /// to the number of sleeping threads.
117#[inline]
118pub(super) fn add_inactive_thread(&self) {
119self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
120 }
121122/// Increments the jobs event counter if `increment_when`, when applied to
123 /// the current value, is true. Used to toggle the JEC from even (sleepy) to
124 /// odd (active) or vice versa. Returns the final value of the counters, for
125 /// which `increment_when` is guaranteed to return false.
126pub(super) fn increment_jobs_event_counter_if(
127&self,
128 increment_when: impl Fn(JobsEventCounter) -> bool,
129 ) -> Counters {
130loop {
131let old_value = self.load(Ordering::SeqCst);
132if increment_when(old_value.jobs_counter()) {
133let new_value = old_value.increment_jobs_counter();
134if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
135return new_value;
136 }
137 } else {
138return old_value;
139 }
140 }
141 }
142143/// Subtracts an inactive thread. This cannot fail. It is invoked
144 /// when a thread finds work and hence becomes active. It returns the
145 /// number of sleeping threads to wake up (if any).
146 ///
147 /// See `add_inactive_thread`.
148#[inline]
149pub(super) fn sub_inactive_thread(&self) -> usize {
150let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
151if true {
if !(old_value.inactive_threads() > 0) {
{
::core::panicking::panic_fmt(format_args!("sub_inactive_thread: old_value {0:?} has no inactive threads",
old_value));
}
};
};debug_assert!(
152 old_value.inactive_threads() > 0,
153"sub_inactive_thread: old_value {:?} has no inactive threads",
154 old_value,
155 );
156if true {
if !(old_value.sleeping_threads() <= old_value.inactive_threads()) {
{
::core::panicking::panic_fmt(format_args!("sub_inactive_thread: old_value {0:?} had {1} sleeping threads and {2} inactive threads",
old_value, old_value.sleeping_threads(),
old_value.inactive_threads()));
}
};
};debug_assert!(
157 old_value.sleeping_threads() <= old_value.inactive_threads(),
158"sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
159 old_value,
160 old_value.sleeping_threads(),
161 old_value.inactive_threads(),
162 );
163164// Current heuristic: whenever an inactive thread goes away, if
165 // there are any sleeping threads, wake 'em up.
166let sleeping_threads = old_value.sleeping_threads();
167 Ord::min(sleeping_threads, 2)
168 }
169170/// Subtracts a sleeping thread. This cannot fail, but it is only
171 /// safe to do if you you know the number of sleeping threads is
172 /// non-zero (i.e., because you have just awoken a sleeping
173 /// thread).
174#[inline]
175pub(super) fn sub_sleeping_thread(&self) {
176let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
177if true {
if !(old_value.sleeping_threads() > 0) {
{
::core::panicking::panic_fmt(format_args!("sub_sleeping_thread: old_value {0:?} had no sleeping threads",
old_value));
}
};
};debug_assert!(
178 old_value.sleeping_threads() > 0,
179"sub_sleeping_thread: old_value {:?} had no sleeping threads",
180 old_value,
181 );
182if true {
if !(old_value.sleeping_threads() <= old_value.inactive_threads()) {
{
::core::panicking::panic_fmt(format_args!("sub_sleeping_thread: old_value {0:?} had {1} sleeping threads and {2} inactive threads",
old_value, old_value.sleeping_threads(),
old_value.inactive_threads()));
}
};
};debug_assert!(
183 old_value.sleeping_threads() <= old_value.inactive_threads(),
184"sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
185 old_value,
186 old_value.sleeping_threads(),
187 old_value.inactive_threads(),
188 );
189 }
190191#[inline]
192pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
193if true {
if !(old_value.inactive_threads() > 0) {
{
::core::panicking::panic_fmt(format_args!("try_add_sleeping_thread: old_value {0:?} has no inactive threads",
old_value));
}
};
};debug_assert!(
194 old_value.inactive_threads() > 0,
195"try_add_sleeping_thread: old_value {:?} has no inactive threads",
196 old_value,
197 );
198if true {
if !(old_value.sleeping_threads() < THREADS_MAX) {
{
::core::panicking::panic_fmt(format_args!("try_add_sleeping_thread: old_value {0:?} has too many sleeping threads",
old_value));
}
};
};debug_assert!(
199 old_value.sleeping_threads() < THREADS_MAX,
200"try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
201 old_value,
202 );
203204let mut new_value = old_value;
205new_value.word += ONE_SLEEPING;
206207self.try_exchange(old_value, new_value, Ordering::SeqCst)
208 }
209}
210211#[inline]
212fn select_thread(word: usize, shift: usize) -> usize {
213 (word >> shift) & THREADS_MAX214}
215216#[inline]
217fn select_jec(word: usize) -> usize {
218word >> JEC_SHIFT219}
220221impl Counters {
222#[inline]
223fn new(word: usize) -> Counters {
224Counters { word }
225 }
226227#[inline]
228fn increment_jobs_counter(self) -> Counters {
229// We can freely add to JEC because it occupies the most significant bits.
230 // Thus it doesn't overflow into the other counters, just wraps itself.
231Counters { word: self.word.wrapping_add(ONE_JEC) }
232 }
233234#[inline]
235pub(super) fn jobs_counter(self) -> JobsEventCounter {
236JobsEventCounter(select_jec(self.word))
237 }
238239/// The number of threads that are not actively
240 /// executing work. They may be idle, sleepy, or asleep.
241#[inline]
242pub(super) fn inactive_threads(self) -> usize {
243select_thread(self.word, INACTIVE_SHIFT)
244 }
245246#[inline]
247pub(super) fn awake_but_idle_threads(self) -> usize {
248if true {
if !(self.sleeping_threads() <= self.inactive_threads()) {
{
::core::panicking::panic_fmt(format_args!("sleeping threads: {0} > raw idle threads {1}",
self.sleeping_threads(), self.inactive_threads()));
}
};
};debug_assert!(
249self.sleeping_threads() <= self.inactive_threads(),
250"sleeping threads: {} > raw idle threads {}",
251self.sleeping_threads(),
252self.inactive_threads()
253 );
254self.inactive_threads() - self.sleeping_threads()
255 }
256257#[inline]
258pub(super) fn sleeping_threads(self) -> usize {
259select_thread(self.word, SLEEPING_SHIFT)
260 }
261}
262263impl std::fmt::Debugfor Counters {
264fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265let word = ::alloc::__export::must_use({
::alloc::fmt::format(format_args!("{0:016x}", self.word))
})format!("{:016x}", self.word);
266fmt.debug_struct("Counters")
267 .field("word", &word)
268 .field("jobs", &self.jobs_counter().0)
269 .field("inactive", &self.inactive_threads())
270 .field("sleeping", &self.sleeping_threads())
271 .finish()
272 }
273}