1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
34use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
78use crossbeam_utils::CachePadded;
910use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
1314mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
1718struct SleepData {
19/// The number of threads in the thread pool.
20worker_count: usize,
2122/// The number of threads in the thread pool which are running and
23 /// aren't blocked in user code or sleeping.
24active_threads: usize,
2526/// The number of threads which are blocked in user code.
27 /// This doesn't include threads blocked by this module.
28blocked_threads: usize,
29}
3031impl SleepData {
32/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33#[inline]
34pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35if self.active_threads == 0 && self.blocked_threads > 0 {
36 (deadlock_handler.as_ref().unwrap())();
37 }
38 }
39}
4041/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`]: README.md
48pub(super) struct Sleep {
49/// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50 /// them block.
51worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
5253 counters: AtomicCounters,
5455 data: Mutex<SleepData>,
56}
5758/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64/// What is worker index of the idle thread?
65worker_index: usize,
6667/// How many rounds have we been circling without sleeping?
68rounds: u32,
6970/// Once we become sleepy, what was the sleepy counter value?
71 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72jobs_counter: JobsEventCounter,
73}
7475/// The "sleep state" for an individual worker.
76#[derive(#[automatically_derived]
impl ::core::default::Default for WorkerSleepState {
#[inline]
fn default() -> WorkerSleepState {
WorkerSleepState {
is_blocked: ::core::default::Default::default(),
condvar: ::core::default::Default::default(),
}
}
}Default)]
77struct WorkerSleepState {
78/// Set to true when the worker goes to sleep; set to false when
79 /// the worker is notified or when it wakes.
80is_blocked: Mutex<bool>,
8182 condvar: Condvar,
83}
8485const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
8788impl Sleep {
89pub(super) fn new(n_threads: usize) -> Sleep {
90if !(n_threads <= THREADS_MAX) {
::core::panicking::panic("assertion failed: n_threads <= THREADS_MAX")
};assert!(n_threads <= THREADS_MAX);
91Sleep {
92 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93 counters: AtomicCounters::new(),
94 data: Mutex::new(SleepData {
95 worker_count: n_threads,
96 active_threads: n_threads,
97 blocked_threads: 0,
98 }),
99 }
100 }
101102/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103 /// if no other worker thread is active
104#[inline]
105pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106let mut data = self.data.lock().unwrap();
107if true {
if !(data.active_threads > 0) {
::core::panicking::panic("assertion failed: data.active_threads > 0")
};
};debug_assert!(data.active_threads > 0);
108if true {
if !(data.blocked_threads < data.worker_count) {
::core::panicking::panic("assertion failed: data.blocked_threads < data.worker_count")
};
};debug_assert!(data.blocked_threads < data.worker_count);
109if true {
if !(data.active_threads > 0) {
::core::panicking::panic("assertion failed: data.active_threads > 0")
};
};debug_assert!(data.active_threads > 0);
110data.active_threads -= 1;
111data.blocked_threads += 1;
112113data.deadlock_check(deadlock_handler);
114 }
115116/// Mark a previously blocked Rayon worker thread as unblocked
117#[inline]
118pub(super) fn mark_unblocked(&self) {
119let mut data = self.data.lock().unwrap();
120if true {
if !(data.active_threads < data.worker_count) {
::core::panicking::panic("assertion failed: data.active_threads < data.worker_count")
};
};debug_assert!(data.active_threads < data.worker_count);
121if true {
if !(data.blocked_threads > 0) {
::core::panicking::panic("assertion failed: data.blocked_threads > 0")
};
};debug_assert!(data.blocked_threads > 0);
122data.active_threads += 1;
123data.blocked_threads -= 1;
124 }
125126#[inline]
127pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
128self.counters.add_inactive_thread();
129130IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
131 }
132133#[inline]
134pub(super) fn work_found(&self) {
135// If we were the last idle thread and other threads are still sleeping,
136 // then we should wake up another thread.
137let threads_to_wake = self.counters.sub_inactive_thread();
138self.wake_any_threads(threads_to_wakeas u32);
139 }
140141#[inline]
142pub(super) fn no_work_found(
143&self,
144 idle_state: &mut IdleState,
145 latch: &CoreLatch,
146 thread: &WorkerThread,
147 steal: bool,
148 ) {
149if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
150 thread::yield_now();
151idle_state.rounds += 1;
152 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
153idle_state.jobs_counter = self.announce_sleepy();
154idle_state.rounds += 1;
155 thread::yield_now();
156 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
157idle_state.rounds += 1;
158 thread::yield_now();
159 } else {
160if true {
match (&idle_state.rounds, &ROUNDS_UNTIL_SLEEPING) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val, ::core::option::Option::None);
}
}
};
};debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
161self.sleep(idle_state, latch, thread, steal);
162 }
163 }
164165#[cold]
166fn announce_sleepy(&self) -> JobsEventCounter {
167self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
168 }
169170#[cold]
171fn sleep(
172&self,
173 idle_state: &mut IdleState,
174 latch: &CoreLatch,
175 thread: &WorkerThread,
176 steal: bool,
177 ) {
178let worker_index = idle_state.worker_index;
179180if !latch.get_sleepy() {
181return;
182 }
183184let sleep_state = &self.worker_sleep_states[worker_index];
185let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
186if true {
if !!*is_blocked {
::core::panicking::panic("assertion failed: !*is_blocked")
};
};debug_assert!(!*is_blocked);
187188// Our latch was signalled. We should wake back up fully as we
189 // will have some stuff to do.
190if !latch.fall_asleep() {
191idle_state.wake_fully();
192return;
193 }
194195loop {
196let counters = self.counters.load(Ordering::SeqCst);
197198// Check if the JEC has changed since we got sleepy.
199if true {
if !idle_state.jobs_counter.is_sleepy() {
::core::panicking::panic("assertion failed: idle_state.jobs_counter.is_sleepy()")
};
};debug_assert!(idle_state.jobs_counter.is_sleepy());
200if counters.jobs_counter() != idle_state.jobs_counter {
201// JEC has changed, so a new job was posted, but for some reason
202 // we didn't see it. We should return to just before the SLEEPY
203 // state so we can do another search and (if we fail to find
204 // work) go back to sleep.
205idle_state.wake_partly();
206latch.wake_up();
207return;
208 }
209210// Otherwise, let's move from IDLE to SLEEPING.
211if self.counters.try_add_sleeping_thread(counters) {
212break;
213 }
214 }
215216// Successfully registered as asleep.
217218 // We have one last check for injected jobs to do. This protects against
219 // deadlock in the very unlikely event that
220 //
221 // - an external job is being injected while we are sleepy
222 // - that job triggers the rollover over the JEC such that we don't see it
223 // - we are the last active worker thread
224std::sync::atomic::fence(Ordering::SeqCst);
225if steal && thread.has_injected_job() {
226// If we see an externally injected job, then we have to 'wake
227 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
228 // the one that wakes us.)
229self.counters.sub_sleeping_thread();
230 } else {
231 {
232// Decrement the number of active threads and check for a deadlock
233let mut data = self.data.lock().unwrap();
234data.active_threads -= 1;
235data.deadlock_check(&thread.registry.deadlock_handler);
236 }
237238// If we don't see an injected job (the normal case), then flag
239 // ourselves as asleep and wait till we are notified.
240 //
241 // (Note that `is_blocked` is held under a mutex and the mutex was
242 // acquired *before* we incremented the "sleepy counter". This means
243 // that whomever is coming to wake us will have to wait until we
244 // release the mutex in the call to `wait`, so they will see this
245 // boolean as true.)
246thread.registry.release_thread();
247*is_blocked = true;
248while *is_blocked {
249 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
250 }
251252// Drop `is_blocked` now in case `acquire_thread` blocks
253drop(is_blocked);
254255thread.registry.acquire_thread();
256 }
257258// Update other state:
259idle_state.wake_fully();
260latch.wake_up();
261 }
262263/// Notify the given thread that it should wake up (if it is
264 /// sleeping). When this method is invoked, we typically know the
265 /// thread is asleep, though in rare cases it could have been
266 /// awoken by (e.g.) new work having been posted.
267pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
268self.wake_specific_thread(target_worker_index);
269 }
270271/// Signals that `num_jobs` new jobs were injected into the thread
272 /// pool from outside. This function will ensure that there are
273 /// threads available to process them, waking threads from sleep
274 /// if necessary.
275 ///
276 /// # Parameters
277 ///
278 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
279 /// We'll try to get at least one thread per job.
280#[inline]
281pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
282// This fence is needed to guarantee that threads
283 // as they are about to fall asleep, observe any
284 // new jobs that may have been injected.
285std::sync::atomic::fence(Ordering::SeqCst);
286287self.new_jobs(num_jobs, queue_was_empty)
288 }
289290/// Signals that `num_jobs` new jobs were pushed onto a thread's
291 /// local deque. This function will try to ensure that there are
292 /// threads available to process them, waking threads from sleep
293 /// if necessary. However, this is not guaranteed: under certain
294 /// race conditions, the function may fail to wake any new
295 /// threads; in that case the existing thread should eventually
296 /// pop the job.
297 ///
298 /// # Parameters
299 ///
300 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
301 /// We'll try to get at least one thread per job.
302#[inline]
303pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
304self.new_jobs(num_jobs, queue_was_empty)
305 }
306307/// Common helper for `new_injected_jobs` and `new_internal_jobs`.
308#[inline]
309fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
310// Read the counters and -- if sleepy workers have announced themselves
311 // -- announce that there is now work available. The final value of `counters`
312 // with which we exit the loop thus corresponds to a state when
313let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
314let num_awake_but_idle = counters.awake_but_idle_threads();
315let num_sleepers = counters.sleeping_threads();
316317if num_sleepers == 0 {
318// nobody to wake
319return;
320 }
321322// Promote from u16 to u32 so we can interoperate with
323 // num_jobs more easily.
324let num_awake_but_idle = num_awake_but_idleas u32;
325let num_sleepers = num_sleepersas u32;
326327// If the queue is non-empty, then we always wake up a worker
328 // -- clearly the existing idle jobs aren't enough. Otherwise,
329 // check to see if we have enough idle workers.
330if !queue_was_empty {
331let num_to_wake = Ord::min(num_jobs, num_sleepers);
332self.wake_any_threads(num_to_wake);
333 } else if num_awake_but_idle < num_jobs {
334let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
335self.wake_any_threads(num_to_wake);
336 }
337 }
338339#[cold]
340fn wake_any_threads(&self, mut num_to_wake: u32) {
341if num_to_wake > 0 {
342for i in 0..self.worker_sleep_states.len() {
343if self.wake_specific_thread(i) {
344 num_to_wake -= 1;
345if num_to_wake == 0 {
346return;
347 }
348 }
349 }
350 }
351 }
352353fn wake_specific_thread(&self, index: usize) -> bool {
354let sleep_state = &self.worker_sleep_states[index];
355356let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
357if *is_blocked {
358*is_blocked = false;
359360// Increment the number of active threads
361self.data.lock().unwrap().active_threads += 1;
362363sleep_state.condvar.notify_one();
364365// When the thread went to sleep, it will have incremented
366 // this value. When we wake it, its our job to decrement
367 // it. We could have the thread do it, but that would
368 // introduce a delay between when the thread was
369 // *notified* and when this counter was decremented. That
370 // might mislead people with new work into thinking that
371 // there are sleeping threads that they should try to
372 // wake, when in fact there is nothing left for them to
373 // do.
374self.counters.sub_sleeping_thread();
375376true
377} else {
378false
379}
380 }
381}
382383impl IdleState {
384fn wake_fully(&mut self) {
385self.rounds = 0;
386self.jobs_counter = JobsEventCounter::DUMMY;
387 }
388389fn wake_partly(&mut self) {
390self.rounds = ROUNDS_UNTIL_SLEEPY;
391self.jobs_counter = JobsEventCounter::DUMMY;
392 }
393}