1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
34use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
78use crossbeam_utils::CachePadded;
910use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
1314mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
1718struct SleepData {
19/// The number of threads in the thread pool.
20worker_count: usize,
2122/// The number of threads in the thread pool which are running and
23 /// aren't blocked in user code or sleeping.
24active_threads: usize,
2526/// The number of threads which are blocked in user code.
27 /// This doesn't include threads blocked by this module.
28blocked_threads: usize,
29}
3031impl SleepData {
32/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33#[inline]
34pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35if self.active_threads == 0 && self.blocked_threads > 0 {
36 (deadlock_handler.as_ref().unwrap())();
37 }
38 }
39}
4041/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`]: README.md
48pub(super) struct Sleep {
49/// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50 /// them block.
51worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
5253 counters: AtomicCounters,
5455 data: Mutex<SleepData>,
56}
5758/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64/// What is worker index of the idle thread?
65worker_index: usize,
6667/// How many rounds have we been circling without sleeping?
68rounds: u32,
6970/// Once we become sleepy, what was the sleepy counter value?
71 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72jobs_counter: JobsEventCounter,
73}
7475/// The "sleep state" for an individual worker.
76#[derive(#[automatically_derived]
impl ::core::default::Default for WorkerSleepState {
#[inline]
fn default() -> WorkerSleepState {
WorkerSleepState {
is_blocked: ::core::default::Default::default(),
condvar: ::core::default::Default::default(),
}
}
}Default)]
77struct WorkerSleepState {
78/// Set to true when the worker goes to sleep; set to false when
79 /// the worker is notified or when it wakes.
80is_blocked: Mutex<bool>,
8182 condvar: Condvar,
83}
8485const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
8788impl Sleep {
89pub(super) fn new(n_threads: usize) -> Sleep {
90if !(n_threads <= THREADS_MAX) {
::core::panicking::panic("assertion failed: n_threads <= THREADS_MAX")
};assert!(n_threads <= THREADS_MAX);
91Sleep {
92 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93 counters: AtomicCounters::new(),
94 data: Mutex::new(SleepData {
95 worker_count: n_threads,
96 active_threads: n_threads,
97 blocked_threads: 0,
98 }),
99 }
100 }
101102/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103 /// if no other worker thread is active
104#[inline]
105pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106let mut data = self.data.lock().unwrap();
107if true {
if !(data.active_threads > 0) {
::core::panicking::panic("assertion failed: data.active_threads > 0")
};
};debug_assert!(data.active_threads > 0);
108if true {
if !(data.blocked_threads < data.worker_count) {
::core::panicking::panic("assertion failed: data.blocked_threads < data.worker_count")
};
};debug_assert!(data.blocked_threads < data.worker_count);
109data.active_threads -= 1;
110data.blocked_threads += 1;
111112data.deadlock_check(deadlock_handler);
113 }
114115/// Mark a previously blocked Rayon worker thread as unblocked
116#[inline]
117pub(super) fn mark_unblocked(&self) {
118let mut data = self.data.lock().unwrap();
119if true {
if !(data.active_threads < data.worker_count) {
::core::panicking::panic("assertion failed: data.active_threads < data.worker_count")
};
};debug_assert!(data.active_threads < data.worker_count);
120if true {
if !(data.blocked_threads > 0) {
::core::panicking::panic("assertion failed: data.blocked_threads > 0")
};
};debug_assert!(data.blocked_threads > 0);
121data.active_threads += 1;
122data.blocked_threads -= 1;
123 }
124125#[inline]
126pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
127self.counters.add_inactive_thread();
128129IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
130 }
131132#[inline]
133pub(super) fn work_found(&self) {
134// If we were the last idle thread and other threads are still sleeping,
135 // then we should wake up another thread.
136let threads_to_wake = self.counters.sub_inactive_thread();
137self.wake_any_threads(threads_to_wakeas u32);
138 }
139140#[inline]
141pub(super) fn no_work_found(
142&self,
143 idle_state: &mut IdleState,
144 latch: &CoreLatch,
145 thread: &WorkerThread,
146 steal: bool,
147 ) {
148if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
149 thread::yield_now();
150idle_state.rounds += 1;
151 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
152idle_state.jobs_counter = self.announce_sleepy();
153idle_state.rounds += 1;
154 thread::yield_now();
155 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
156idle_state.rounds += 1;
157 thread::yield_now();
158 } else {
159if true {
match (&idle_state.rounds, &ROUNDS_UNTIL_SLEEPING) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val, ::core::option::Option::None);
}
}
};
};debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
160self.sleep(idle_state, latch, thread, steal);
161 }
162 }
163164#[cold]
165fn announce_sleepy(&self) -> JobsEventCounter {
166self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
167 }
168169#[cold]
170fn sleep(
171&self,
172 idle_state: &mut IdleState,
173 latch: &CoreLatch,
174 thread: &WorkerThread,
175 steal: bool,
176 ) {
177let worker_index = idle_state.worker_index;
178179if !latch.get_sleepy() {
180return;
181 }
182183let sleep_state = &self.worker_sleep_states[worker_index];
184let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
185if true {
if !!*is_blocked {
::core::panicking::panic("assertion failed: !*is_blocked")
};
};debug_assert!(!*is_blocked);
186187// Our latch was signalled. We should wake back up fully as we
188 // will have some stuff to do.
189if !latch.fall_asleep() {
190idle_state.wake_fully();
191return;
192 }
193194loop {
195let counters = self.counters.load(Ordering::SeqCst);
196197// Check if the JEC has changed since we got sleepy.
198if true {
if !idle_state.jobs_counter.is_sleepy() {
::core::panicking::panic("assertion failed: idle_state.jobs_counter.is_sleepy()")
};
};debug_assert!(idle_state.jobs_counter.is_sleepy());
199if counters.jobs_counter() != idle_state.jobs_counter {
200// JEC has changed, so a new job was posted, but for some reason
201 // we didn't see it. We should return to just before the SLEEPY
202 // state so we can do another search and (if we fail to find
203 // work) go back to sleep.
204idle_state.wake_partly();
205latch.wake_up();
206return;
207 }
208209// Otherwise, let's move from IDLE to SLEEPING.
210if self.counters.try_add_sleeping_thread(counters) {
211break;
212 }
213 }
214215// Successfully registered as asleep.
216217 // We have one last check for injected jobs to do. This protects against
218 // deadlock in the very unlikely event that
219 //
220 // - an external job is being injected while we are sleepy
221 // - that job triggers the rollover over the JEC such that we don't see it
222 // - we are the last active worker thread
223std::sync::atomic::fence(Ordering::SeqCst);
224if steal && thread.has_injected_job() {
225// If we see an externally injected job, then we have to 'wake
226 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
227 // the one that wakes us.)
228self.counters.sub_sleeping_thread();
229 } else {
230 {
231// Decrement the number of active threads and check for a deadlock
232let mut data = self.data.lock().unwrap();
233data.active_threads -= 1;
234data.deadlock_check(&thread.registry.deadlock_handler);
235 }
236237// If we don't see an injected job (the normal case), then flag
238 // ourselves as asleep and wait till we are notified.
239 //
240 // (Note that `is_blocked` is held under a mutex and the mutex was
241 // acquired *before* we incremented the "sleepy counter". This means
242 // that whomever is coming to wake us will have to wait until we
243 // release the mutex in the call to `wait`, so they will see this
244 // boolean as true.)
245thread.registry.release_thread();
246*is_blocked = true;
247while *is_blocked {
248 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
249 }
250251// Drop `is_blocked` now in case `acquire_thread` blocks
252drop(is_blocked);
253254thread.registry.acquire_thread();
255 }
256257// Update other state:
258idle_state.wake_fully();
259latch.wake_up();
260 }
261262/// Notify the given thread that it should wake up (if it is
263 /// sleeping). When this method is invoked, we typically know the
264 /// thread is asleep, though in rare cases it could have been
265 /// awoken by (e.g.) new work having been posted.
266pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
267self.wake_specific_thread(target_worker_index);
268 }
269270/// Signals that `num_jobs` new jobs were injected into the thread
271 /// pool from outside. This function will ensure that there are
272 /// threads available to process them, waking threads from sleep
273 /// if necessary.
274 ///
275 /// # Parameters
276 ///
277 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
278 /// We'll try to get at least one thread per job.
279#[inline]
280pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
281// This fence is needed to guarantee that threads
282 // as they are about to fall asleep, observe any
283 // new jobs that may have been injected.
284std::sync::atomic::fence(Ordering::SeqCst);
285286self.new_jobs(num_jobs, queue_was_empty)
287 }
288289/// Signals that `num_jobs` new jobs were pushed onto a thread's
290 /// local deque. This function will try to ensure that there are
291 /// threads available to process them, waking threads from sleep
292 /// if necessary. However, this is not guaranteed: under certain
293 /// race conditions, the function may fail to wake any new
294 /// threads; in that case the existing thread should eventually
295 /// pop the job.
296 ///
297 /// # Parameters
298 ///
299 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
300 /// We'll try to get at least one thread per job.
301#[inline]
302pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
303self.new_jobs(num_jobs, queue_was_empty)
304 }
305306/// Common helper for `new_injected_jobs` and `new_internal_jobs`.
307#[inline]
308fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
309// Read the counters and -- if sleepy workers have announced themselves
310 // -- announce that there is now work available. The final value of `counters`
311 // with which we exit the loop thus corresponds to a state when
312let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
313let num_awake_but_idle = counters.awake_but_idle_threads();
314let num_sleepers = counters.sleeping_threads();
315316if num_sleepers == 0 {
317// nobody to wake
318return;
319 }
320321// Promote from u16 to u32 so we can interoperate with
322 // num_jobs more easily.
323let num_awake_but_idle = num_awake_but_idleas u32;
324let num_sleepers = num_sleepersas u32;
325326// If the queue is non-empty, then we always wake up a worker
327 // -- clearly the existing idle jobs aren't enough. Otherwise,
328 // check to see if we have enough idle workers.
329if !queue_was_empty {
330let num_to_wake = Ord::min(num_jobs, num_sleepers);
331self.wake_any_threads(num_to_wake);
332 } else if num_awake_but_idle < num_jobs {
333let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
334self.wake_any_threads(num_to_wake);
335 }
336 }
337338#[cold]
339fn wake_any_threads(&self, mut num_to_wake: u32) {
340if num_to_wake > 0 {
341for i in 0..self.worker_sleep_states.len() {
342if self.wake_specific_thread(i) {
343 num_to_wake -= 1;
344if num_to_wake == 0 {
345return;
346 }
347 }
348 }
349 }
350 }
351352fn wake_specific_thread(&self, index: usize) -> bool {
353let sleep_state = &self.worker_sleep_states[index];
354355let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
356if *is_blocked {
357*is_blocked = false;
358359// Increment the number of active threads
360self.data.lock().unwrap().active_threads += 1;
361362sleep_state.condvar.notify_one();
363364// When the thread went to sleep, it will have incremented
365 // this value. When we wake it, its our job to decrement
366 // it. We could have the thread do it, but that would
367 // introduce a delay between when the thread was
368 // *notified* and when this counter was decremented. That
369 // might mislead people with new work into thinking that
370 // there are sleeping threads that they should try to
371 // wake, when in fact there is nothing left for them to
372 // do.
373self.counters.sub_sleeping_thread();
374375true
376} else {
377false
378}
379 }
380}
381382impl IdleState {
383fn wake_fully(&mut self) {
384self.rounds = 0;
385self.jobs_counter = JobsEventCounter::DUMMY;
386 }
387388fn wake_partly(&mut self) {
389self.rounds = ROUNDS_UNTIL_SLEEPY;
390self.jobs_counter = JobsEventCounter::DUMMY;
391 }
392}