Skip to main content

rustc_thread_pool/sleep/
mod.rs

1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use std::sync::atomic::Ordering;
5use std::sync::{Condvar, Mutex};
6use std::thread;
7
8use crossbeam_utils::CachePadded;
9
10use crate::DeadlockHandler;
11use crate::latch::CoreLatch;
12use crate::registry::WorkerThread;
13
14mod counters;
15pub(crate) use self::counters::THREADS_MAX;
16use self::counters::{AtomicCounters, JobsEventCounter};
17
18struct SleepData {
19    /// The number of threads in the thread pool.
20    worker_count: usize,
21
22    /// The number of threads in the thread pool which are running and
23    /// aren't blocked in user code or sleeping.
24    active_threads: usize,
25
26    /// The number of threads which are blocked in user code.
27    /// This doesn't include threads blocked by this module.
28    blocked_threads: usize,
29}
30
31impl SleepData {
32    /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33    #[inline]
34    pub(super) fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35        if self.active_threads == 0 && self.blocked_threads > 0 {
36            (deadlock_handler.as_ref().unwrap())();
37        }
38    }
39}
40
41/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
42/// of workers. It has callbacks that are invoked periodically at significant events,
43/// such as when workers are looping and looking for work, when latches are set, or when
44/// jobs are published, and it either blocks threads or wakes them in response to these
45/// events. See the [`README.md`] in this module for more details.
46///
47/// [`README.md`]: README.md
48pub(super) struct Sleep {
49    /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
50    /// them block.
51    worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
52
53    counters: AtomicCounters,
54
55    data: Mutex<SleepData>,
56}
57
58/// An instance of this struct is created when a thread becomes idle.
59/// It is consumed when the thread finds work, and passed by `&mut`
60/// reference for operations that preserve the idle state. (In other
61/// words, producing one of these structs is evidence the thread is
62/// idle.) It tracks state such as how long the thread has been idle.
63pub(super) struct IdleState {
64    /// What is worker index of the idle thread?
65    worker_index: usize,
66
67    /// How many rounds have we been circling without sleeping?
68    rounds: u32,
69
70    /// Once we become sleepy, what was the sleepy counter value?
71    /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
72    jobs_counter: JobsEventCounter,
73}
74
75/// The "sleep state" for an individual worker.
76#[derive(#[automatically_derived]
impl ::core::default::Default for WorkerSleepState {
    #[inline]
    fn default() -> WorkerSleepState {
        WorkerSleepState {
            is_blocked: ::core::default::Default::default(),
            condvar: ::core::default::Default::default(),
        }
    }
}Default)]
77struct WorkerSleepState {
78    /// Set to true when the worker goes to sleep; set to false when
79    /// the worker is notified or when it wakes.
80    is_blocked: Mutex<bool>,
81
82    condvar: Condvar,
83}
84
85const ROUNDS_UNTIL_SLEEPY: u32 = 32;
86const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
87
88impl Sleep {
89    pub(super) fn new(n_threads: usize) -> Sleep {
90        if !(n_threads <= THREADS_MAX) {
    ::core::panicking::panic("assertion failed: n_threads <= THREADS_MAX")
};assert!(n_threads <= THREADS_MAX);
91        Sleep {
92            worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
93            counters: AtomicCounters::new(),
94            data: Mutex::new(SleepData {
95                worker_count: n_threads,
96                active_threads: n_threads,
97                blocked_threads: 0,
98            }),
99        }
100    }
101
102    /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
103    /// if no other worker thread is active
104    #[inline]
105    pub(super) fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
106        let mut data = self.data.lock().unwrap();
107        if true {
    if !(data.active_threads > 0) {
        ::core::panicking::panic("assertion failed: data.active_threads > 0")
    };
};debug_assert!(data.active_threads > 0);
108        if true {
    if !(data.blocked_threads < data.worker_count) {
        ::core::panicking::panic("assertion failed: data.blocked_threads < data.worker_count")
    };
};debug_assert!(data.blocked_threads < data.worker_count);
109        data.active_threads -= 1;
110        data.blocked_threads += 1;
111
112        data.deadlock_check(deadlock_handler);
113    }
114
115    /// Mark a previously blocked Rayon worker thread as unblocked
116    #[inline]
117    pub(super) fn mark_unblocked(&self) {
118        let mut data = self.data.lock().unwrap();
119        if true {
    if !(data.active_threads < data.worker_count) {
        ::core::panicking::panic("assertion failed: data.active_threads < data.worker_count")
    };
};debug_assert!(data.active_threads < data.worker_count);
120        if true {
    if !(data.blocked_threads > 0) {
        ::core::panicking::panic("assertion failed: data.blocked_threads > 0")
    };
};debug_assert!(data.blocked_threads > 0);
121        data.active_threads += 1;
122        data.blocked_threads -= 1;
123    }
124
125    #[inline]
126    pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
127        self.counters.add_inactive_thread();
128
129        IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
130    }
131
132    #[inline]
133    pub(super) fn work_found(&self) {
134        // If we were the last idle thread and other threads are still sleeping,
135        // then we should wake up another thread.
136        let threads_to_wake = self.counters.sub_inactive_thread();
137        self.wake_any_threads(threads_to_wake as u32);
138    }
139
140    #[inline]
141    pub(super) fn no_work_found(
142        &self,
143        idle_state: &mut IdleState,
144        latch: &CoreLatch,
145        thread: &WorkerThread,
146        steal: bool,
147    ) {
148        if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
149            thread::yield_now();
150            idle_state.rounds += 1;
151        } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
152            idle_state.jobs_counter = self.announce_sleepy();
153            idle_state.rounds += 1;
154            thread::yield_now();
155        } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
156            idle_state.rounds += 1;
157            thread::yield_now();
158        } else {
159            if true {
    match (&idle_state.rounds, &ROUNDS_UNTIL_SLEEPING) {
        (left_val, right_val) => {
            if !(*left_val == *right_val) {
                let kind = ::core::panicking::AssertKind::Eq;
                ::core::panicking::assert_failed(kind, &*left_val,
                    &*right_val, ::core::option::Option::None);
            }
        }
    };
};debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
160            self.sleep(idle_state, latch, thread, steal);
161        }
162    }
163
164    #[cold]
165    fn announce_sleepy(&self) -> JobsEventCounter {
166        self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
167    }
168
169    #[cold]
170    fn sleep(
171        &self,
172        idle_state: &mut IdleState,
173        latch: &CoreLatch,
174        thread: &WorkerThread,
175        steal: bool,
176    ) {
177        let worker_index = idle_state.worker_index;
178
179        if !latch.get_sleepy() {
180            return;
181        }
182
183        let sleep_state = &self.worker_sleep_states[worker_index];
184        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
185        if true {
    if !!*is_blocked {
        ::core::panicking::panic("assertion failed: !*is_blocked")
    };
};debug_assert!(!*is_blocked);
186
187        // Our latch was signalled. We should wake back up fully as we
188        // will have some stuff to do.
189        if !latch.fall_asleep() {
190            idle_state.wake_fully();
191            return;
192        }
193
194        loop {
195            let counters = self.counters.load(Ordering::SeqCst);
196
197            // Check if the JEC has changed since we got sleepy.
198            if true {
    if !idle_state.jobs_counter.is_sleepy() {
        ::core::panicking::panic("assertion failed: idle_state.jobs_counter.is_sleepy()")
    };
};debug_assert!(idle_state.jobs_counter.is_sleepy());
199            if counters.jobs_counter() != idle_state.jobs_counter {
200                // JEC has changed, so a new job was posted, but for some reason
201                // we didn't see it. We should return to just before the SLEEPY
202                // state so we can do another search and (if we fail to find
203                // work) go back to sleep.
204                idle_state.wake_partly();
205                latch.wake_up();
206                return;
207            }
208
209            // Otherwise, let's move from IDLE to SLEEPING.
210            if self.counters.try_add_sleeping_thread(counters) {
211                break;
212            }
213        }
214
215        // Successfully registered as asleep.
216
217        // We have one last check for injected jobs to do. This protects against
218        // deadlock in the very unlikely event that
219        //
220        // - an external job is being injected while we are sleepy
221        // - that job triggers the rollover over the JEC such that we don't see it
222        // - we are the last active worker thread
223        std::sync::atomic::fence(Ordering::SeqCst);
224        if steal && thread.has_injected_job() {
225            // If we see an externally injected job, then we have to 'wake
226            // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
227            // the one that wakes us.)
228            self.counters.sub_sleeping_thread();
229        } else {
230            {
231                // Decrement the number of active threads and check for a deadlock
232                let mut data = self.data.lock().unwrap();
233                data.active_threads -= 1;
234                data.deadlock_check(&thread.registry.deadlock_handler);
235            }
236
237            // If we don't see an injected job (the normal case), then flag
238            // ourselves as asleep and wait till we are notified.
239            //
240            // (Note that `is_blocked` is held under a mutex and the mutex was
241            // acquired *before* we incremented the "sleepy counter". This means
242            // that whomever is coming to wake us will have to wait until we
243            // release the mutex in the call to `wait`, so they will see this
244            // boolean as true.)
245            thread.registry.release_thread();
246            *is_blocked = true;
247            while *is_blocked {
248                is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
249            }
250
251            // Drop `is_blocked` now in case `acquire_thread` blocks
252            drop(is_blocked);
253
254            thread.registry.acquire_thread();
255        }
256
257        // Update other state:
258        idle_state.wake_fully();
259        latch.wake_up();
260    }
261
262    /// Notify the given thread that it should wake up (if it is
263    /// sleeping). When this method is invoked, we typically know the
264    /// thread is asleep, though in rare cases it could have been
265    /// awoken by (e.g.) new work having been posted.
266    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
267        self.wake_specific_thread(target_worker_index);
268    }
269
270    /// Signals that `num_jobs` new jobs were injected into the thread
271    /// pool from outside. This function will ensure that there are
272    /// threads available to process them, waking threads from sleep
273    /// if necessary.
274    ///
275    /// # Parameters
276    ///
277    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
278    ///   We'll try to get at least one thread per job.
279    #[inline]
280    pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
281        // This fence is needed to guarantee that threads
282        // as they are about to fall asleep, observe any
283        // new jobs that may have been injected.
284        std::sync::atomic::fence(Ordering::SeqCst);
285
286        self.new_jobs(num_jobs, queue_was_empty)
287    }
288
289    /// Signals that `num_jobs` new jobs were pushed onto a thread's
290    /// local deque. This function will try to ensure that there are
291    /// threads available to process them, waking threads from sleep
292    /// if necessary. However, this is not guaranteed: under certain
293    /// race conditions, the function may fail to wake any new
294    /// threads; in that case the existing thread should eventually
295    /// pop the job.
296    ///
297    /// # Parameters
298    ///
299    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
300    ///   We'll try to get at least one thread per job.
301    #[inline]
302    pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
303        self.new_jobs(num_jobs, queue_was_empty)
304    }
305
306    /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
307    #[inline]
308    fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
309        // Read the counters and -- if sleepy workers have announced themselves
310        // -- announce that there is now work available. The final value of `counters`
311        // with which we exit the loop thus corresponds to a state when
312        let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
313        let num_awake_but_idle = counters.awake_but_idle_threads();
314        let num_sleepers = counters.sleeping_threads();
315
316        if num_sleepers == 0 {
317            // nobody to wake
318            return;
319        }
320
321        // Promote from u16 to u32 so we can interoperate with
322        // num_jobs more easily.
323        let num_awake_but_idle = num_awake_but_idle as u32;
324        let num_sleepers = num_sleepers as u32;
325
326        // If the queue is non-empty, then we always wake up a worker
327        // -- clearly the existing idle jobs aren't enough. Otherwise,
328        // check to see if we have enough idle workers.
329        if !queue_was_empty {
330            let num_to_wake = Ord::min(num_jobs, num_sleepers);
331            self.wake_any_threads(num_to_wake);
332        } else if num_awake_but_idle < num_jobs {
333            let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
334            self.wake_any_threads(num_to_wake);
335        }
336    }
337
338    #[cold]
339    fn wake_any_threads(&self, mut num_to_wake: u32) {
340        if num_to_wake > 0 {
341            for i in 0..self.worker_sleep_states.len() {
342                if self.wake_specific_thread(i) {
343                    num_to_wake -= 1;
344                    if num_to_wake == 0 {
345                        return;
346                    }
347                }
348            }
349        }
350    }
351
352    fn wake_specific_thread(&self, index: usize) -> bool {
353        let sleep_state = &self.worker_sleep_states[index];
354
355        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
356        if *is_blocked {
357            *is_blocked = false;
358
359            // Increment the number of active threads
360            self.data.lock().unwrap().active_threads += 1;
361
362            sleep_state.condvar.notify_one();
363
364            // When the thread went to sleep, it will have incremented
365            // this value. When we wake it, its our job to decrement
366            // it. We could have the thread do it, but that would
367            // introduce a delay between when the thread was
368            // *notified* and when this counter was decremented. That
369            // might mislead people with new work into thinking that
370            // there are sleeping threads that they should try to
371            // wake, when in fact there is nothing left for them to
372            // do.
373            self.counters.sub_sleeping_thread();
374
375            true
376        } else {
377            false
378        }
379    }
380}
381
382impl IdleState {
383    fn wake_fully(&mut self) {
384        self.rounds = 0;
385        self.jobs_counter = JobsEventCounter::DUMMY;
386    }
387
388    fn wake_partly(&mut self) {
389        self.rounds = ROUNDS_UNTIL_SLEEPY;
390        self.jobs_counter = JobsEventCounter::DUMMY;
391    }
392}