1use std::cell::Cell;
2use std::collections::hash_map::DefaultHasher;
3use std::hash::Hasher;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex, Once};
6use std::{fmt, io, mem, ptr, thread};
78use crossbeam_deque::{Injector, Steal, Stealer, Worker};
9use smallvec::SmallVec;
1011use crate::job::{JobFifo, JobRef, StackJob};
12use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
13use crate::sleep::Sleep;
14use crate::tlv::Tlv;
15use crate::{
16AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
17ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield, unwind,
18};
1920/// Thread builder used for customization via
21/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
22pub struct ThreadBuilder {
23 name: Option<String>,
24 stack_size: Option<usize>,
25 worker: Worker<JobRef>,
26 stealer: Stealer<JobRef>,
27 registry: Arc<Registry>,
28 index: usize,
29}
3031impl ThreadBuilder {
32/// Gets the index of this thread in the pool, within `0..num_threads`.
33pub fn index(&self) -> usize {
34self.index
35 }
3637/// Gets the string that was specified by `ThreadPoolBuilder::name()`.
38pub fn name(&self) -> Option<&str> {
39self.name.as_deref()
40 }
4142/// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
43pub fn stack_size(&self) -> Option<usize> {
44self.stack_size
45 }
4647/// Executes the main loop for this thread. This will not return until the
48 /// thread pool is dropped.
49pub fn run(self) {
50unsafe { main_loop(self) }
51 }
52}
5354impl fmt::Debugfor ThreadBuilder {
55fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56f.debug_struct("ThreadBuilder")
57 .field("pool", &self.registry.id())
58 .field("index", &self.index)
59 .field("name", &self.name)
60 .field("stack_size", &self.stack_size)
61 .finish()
62 }
63}
6465/// Generalized trait for spawning a thread in the `Registry`.
66///
67/// This trait is pub-in-private -- E0445 forces us to make it public,
68/// but we don't actually want to expose these details in the API.
69pub trait ThreadSpawn {
70self
crate::private::PrivateMarkerprivate_decl! {}7172/// Spawn a thread with the `ThreadBuilder` parameters, and then
73 /// call `ThreadBuilder::run()`.
74fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
75}
7677/// Spawns a thread in the "normal" way with `std::thread::Builder`.
78///
79/// This type is pub-in-private -- E0445 forces us to make it public,
80/// but we don't actually want to expose these details in the API.
81#[derive(#[automatically_derived]
impl ::core::fmt::Debug for DefaultSpawn {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::write_str(f, "DefaultSpawn")
}
}Debug, #[automatically_derived]
impl ::core::default::Default for DefaultSpawn {
#[inline]
fn default() -> DefaultSpawn { DefaultSpawn {} }
}Default)]
82pub struct DefaultSpawn;
8384impl ThreadSpawnfor DefaultSpawn {
85self
crate::private::PrivateMarker
crate::private::PrivateMarker;private_impl! {}8687fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
88let mut b = thread::Builder::new();
89if let Some(name) = thread.name() {
90b = b.name(name.to_owned());
91 }
92if let Some(stack_size) = thread.stack_size() {
93b = b.stack_size(stack_size);
94 }
95b.spawn(|| thread.run())?;
96Ok(())
97 }
98}
99100/// Spawns a thread with a user's custom callback.
101///
102/// This type is pub-in-private -- E0445 forces us to make it public,
103/// but we don't actually want to expose these details in the API.
104#[derive(#[automatically_derived]
impl<F: ::core::fmt::Debug> ::core::fmt::Debug for CustomSpawn<F> {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_tuple_field1_finish(f, "CustomSpawn",
&&self.0)
}
}Debug)]
105pub struct CustomSpawn<F>(F);
106107impl<F> CustomSpawn<F>
108where
109F: FnMut(ThreadBuilder) -> io::Result<()>,
110{
111pub(super) fn new(spawn: F) -> Self {
112CustomSpawn(spawn)
113 }
114}
115116impl<F> ThreadSpawnfor CustomSpawn<F>
117where
118F: FnMut(ThreadBuilder) -> io::Result<()>,
119{
120self
crate::private::PrivateMarker
crate::private::PrivateMarker;private_impl! {}121122#[inline]
123fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
124 (self.0)(thread)
125 }
126}
127128pub struct Registry {
129 thread_infos: Vec<ThreadInfo>,
130 sleep: Sleep,
131 injected_jobs: Injector<JobRef>,
132 broadcasts: Mutex<Vec<Worker<JobRef>>>,
133 panic_handler: Option<Box<PanicHandler>>,
134pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
135 start_handler: Option<Box<StartHandler>>,
136 exit_handler: Option<Box<ExitHandler>>,
137pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
138pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
139140// When this latch reaches 0, it means that all work on this
141 // registry must be complete. This is ensured in the following ways:
142 //
143 // - if this is the global registry, there is a ref-count that never
144 // gets released.
145 // - if this is a user-created thread-pool, then so long as the thread-pool
146 // exists, it holds a reference.
147 // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
148 // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
149 // return until the blocking job is complete, that ref will continue to be held.
150 // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
151 // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
152 // and that job will keep the pool alive.
153terminate_count: AtomicUsize,
154}
155156///////////////////////////////////////////////////////////////////////////
157// Initialization
158159static mut THE_REGISTRY: Option<Arc<Registry>> = None;
160static THE_REGISTRY_SET: Once = Once::new();
161162/// Starts the worker threads (if that has not already happened). If
163/// initialization has not already occurred, use the default
164/// configuration.
165pub(super) fn global_registry() -> &'static Arc<Registry> {
166set_global_registry(default_global_registry)
167 .or_else(|err| {
168// SAFETY: we only create a shared reference to `THE_REGISTRY` after the `call_once`
169 // that initializes it, and there will be no more mutable accesses at all.
170if true {
if !THE_REGISTRY_SET.is_completed() {
::core::panicking::panic("assertion failed: THE_REGISTRY_SET.is_completed()")
};
};debug_assert!(THE_REGISTRY_SET.is_completed());
171let the_registry = unsafe { &*&raw const THE_REGISTRYptr::addr_of!(THE_REGISTRY) };
172the_registry.as_ref().ok_or(err)
173 })
174 .expect("The global thread pool has not been initialized.")
175}
176177/// Starts the worker threads (if that has not already happened) with
178/// the given builder.
179pub(super) fn init_global_registry<S>(
180 builder: ThreadPoolBuilder<S>,
181) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
182where
183S: ThreadSpawn,
184{
185set_global_registry(|| Registry::new(builder))
186}
187188/// Starts the worker threads (if that has not already happened)
189/// by creating a registry with the given callback.
190fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
191where
192F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
193{
194let mut result = Err(ThreadPoolBuildError::new(ErrorKind::GlobalPoolAlreadyInitialized));
195196THE_REGISTRY_SET.call_once(|| {
197result = registry().map(|registry: Arc<Registry>| {
198// SAFETY: this is the only mutable access to `THE_REGISTRY`, thanks to `Once`, and
199 // `global_registry()` only takes a shared reference **after** this `call_once`.
200unsafe {
201&raw mut THE_REGISTRYptr::addr_of_mut!(THE_REGISTRY).write(Some(registry));
202 (*&raw const THE_REGISTRYptr::addr_of!(THE_REGISTRY)).as_ref().unwrap_unchecked()
203 }
204 })
205 });
206207result208}
209210fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
211let result = Registry::new(ThreadPoolBuilder::new());
212213// If we're running in an environment that doesn't support threads at all, we can fall back to
214 // using the current thread alone. This is crude, and probably won't work for non-blocking
215 // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
216 //
217 // Notably, this allows current WebAssembly targets to work even though their threading support
218 // is stubbed out, and we won't have to change anything if they do add real threading.
219let unsupported = #[allow(non_exhaustive_omitted_patterns)] match &result {
Err(e) if e.is_unsupported() => true,
_ => false,
}matches!(&result, Err(e) if e.is_unsupported());
220if unsupported && WorkerThread::current().is_null() {
221let builder = ThreadPoolBuilder::new().num_threads(1).spawn_handler(|thread| {
222// Rather than starting a new thread, we're just taking over the current thread
223 // *without* running the main loop, so we can still return from here.
224 // The WorkerThread is leaked, but we never shutdown the global pool anyway.
225let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
226let registry = &*worker_thread.registry;
227let index = worker_thread.index;
228229unsafe {
230WorkerThread::set_current(worker_thread);
231232// let registry know we are ready to do work
233Latch::set(®istry.thread_infos[index].primed);
234 }
235236Ok(())
237 });
238239let fallback_result = Registry::new(builder);
240if fallback_result.is_ok() {
241return fallback_result;
242 }
243 }
244245result246}
247248struct Terminator<'a>(&'a Arc<Registry>);
249250impl<'a> Dropfor Terminator<'a> {
251fn drop(&mut self) {
252self.0.terminate()
253 }
254}
255256impl Registry {
257pub(super) fn new<S>(
258mut builder: ThreadPoolBuilder<S>,
259 ) -> Result<Arc<Self>, ThreadPoolBuildError>
260where
261S: ThreadSpawn,
262 {
263// Soft-limit the number of threads that we can actually support.
264let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
265266let breadth_first = builder.get_breadth_first();
267268let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
269 .map(|_| {
270let worker = if breadth_first { Worker::new_fifo() } else { Worker::new_lifo() };
271272let stealer = worker.stealer();
273 (worker, stealer)
274 })
275 .unzip();
276277let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads)
278 .map(|_| {
279let worker = Worker::new_fifo();
280let stealer = worker.stealer();
281 (worker, stealer)
282 })
283 .unzip();
284285let registry = Arc::new(Registry {
286 thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
287 sleep: Sleep::new(n_threads),
288 injected_jobs: Injector::new(),
289 broadcasts: Mutex::new(broadcasts),
290 terminate_count: AtomicUsize::new(1),
291 panic_handler: builder.take_panic_handler(),
292 deadlock_handler: builder.take_deadlock_handler(),
293 start_handler: builder.take_start_handler(),
294 exit_handler: builder.take_exit_handler(),
295 acquire_thread_handler: builder.take_acquire_thread_handler(),
296 release_thread_handler: builder.take_release_thread_handler(),
297 });
298299// If we return early or panic, make sure to terminate existing threads.
300let t1000 = Terminator(®istry);
301302for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() {
303let thread = ThreadBuilder {
304 name: builder.get_thread_name(index),
305 stack_size: builder.get_stack_size(),
306 registry: Arc::clone(®istry),
307 worker,
308 stealer,
309 index,
310 };
311if let Err(e) = builder.get_spawn_handler().spawn(thread) {
312return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
313 }
314 }
315316// Returning normally now, without termination.
317mem::forget(t1000);
318319Ok(registry)
320 }
321322pub fn current() -> Arc<Registry> {
323unsafe {
324let worker_thread = WorkerThread::current();
325let registry = if worker_thread.is_null() {
326global_registry()
327 } else {
328&(*worker_thread).registry
329 };
330Arc::clone(registry)
331 }
332 }
333334/// Returns the number of threads in the current registry. This
335 /// is better than `Registry::current().num_threads()` because it
336 /// avoids incrementing the `Arc`.
337pub(super) fn current_num_threads() -> usize {
338unsafe {
339let worker_thread = WorkerThread::current();
340if worker_thread.is_null() {
341global_registry().num_threads()
342 } else {
343 (*worker_thread).registry.num_threads()
344 }
345 }
346 }
347348/// Returns the current `WorkerThread` if it's part of this `Registry`.
349pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
350unsafe {
351let worker = WorkerThread::current().as_ref()?;
352if worker.registry().id() == self.id() { Some(worker) } else { None }
353 }
354 }
355356/// Returns an opaque identifier for this registry.
357pub(super) fn id(&self) -> RegistryId {
358// We can rely on `self` not to change since we only ever create
359 // registries that are boxed up in an `Arc` (see `new()` above).
360RegistryId { addr: selfas *const Self as usize }
361 }
362363pub(super) fn num_threads(&self) -> usize {
364self.thread_infos.len()
365 }
366367pub(super) fn catch_unwind(&self, f: impl FnOnce()) {
368if let Err(err) = unwind::halt_unwinding(f) {
369// If there is no handler, or if that handler itself panics, then we abort.
370let abort_guard = unwind::AbortIfPanic;
371if let Some(ref handler) = self.panic_handler {
372handler(err);
373 mem::forget(abort_guard);
374 }
375 }
376 }
377378/// Waits for the worker threads to get up and running. This is
379 /// meant to be used for benchmarking purposes, primarily, so that
380 /// you can get more consistent numbers by having everything
381 /// "ready to go".
382pub(super) fn wait_until_primed(&self) {
383for info in &self.thread_infos {
384 info.primed.wait();
385 }
386 }
387388/// Waits for the worker threads to stop. This is used for testing
389 /// -- so we can check that termination actually works.
390pub(super) fn wait_until_stopped(&self) {
391self.release_thread();
392for info in &self.thread_infos {
393 info.stopped.wait();
394 }
395self.acquire_thread();
396 }
397398pub(crate) fn acquire_thread(&self) {
399if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
400acquire_thread_handler();
401 }
402 }
403404pub(crate) fn release_thread(&self) {
405if let Some(ref release_thread_handler) = self.release_thread_handler {
406release_thread_handler();
407 }
408 }
409410///////////////////////////////////////////////////////////////////////////
411/// MAIN LOOP
412 ///
413 /// So long as all of the worker threads are hanging out in their
414 /// top-level loop, there is no work to be done.
415 ///
416 /// Push a job into the given `registry`. If we are running on a
417 /// worker thread for the registry, this will push onto the
418 /// deque. Else, it will inject from the outside (which is slower).
419pub(super) fn inject_or_push(&self, job_ref: JobRef) {
420let worker_thread = WorkerThread::current();
421unsafe {
422if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
423 (*worker_thread).push(job_ref);
424 } else {
425self.inject(job_ref);
426 }
427 }
428 }
429430/// Push a job into the "external jobs" queue; it will be taken by
431 /// whatever worker has nothing to do. Use this if you know that
432 /// you are not on a worker of this registry.
433pub(super) fn inject(&self, injected_job: JobRef) {
434// It should not be possible for `state.terminate` to be true
435 // here. It is only set to true when the user creates (and
436 // drops) a `ThreadPool`; and, in that case, they cannot be
437 // calling `inject()` later, since they dropped their
438 // `ThreadPool`.
439if true {
match (&(self.terminate_count.load(Ordering::Acquire)), &(0)) {
(left_val, right_val) => {
if *left_val == *right_val {
let kind = ::core::panicking::AssertKind::Ne;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val,
::core::option::Option::Some(format_args!("inject() sees state.terminate as true")));
}
}
};
};debug_assert_ne!(
440self.terminate_count.load(Ordering::Acquire),
4410,
442"inject() sees state.terminate as true"
443);
444445let queue_was_empty = self.injected_jobs.is_empty();
446447self.injected_jobs.push(injected_job);
448self.sleep.new_injected_jobs(1, queue_was_empty);
449 }
450451pub(crate) fn has_injected_job(&self) -> bool {
452 !self.injected_jobs.is_empty()
453 }
454455fn pop_injected_job(&self) -> Option<JobRef> {
456loop {
457match self.injected_jobs.steal() {
458 Steal::Success(job) => return Some(job),
459 Steal::Empty => return None,
460 Steal::Retry => {}
461 }
462 }
463 }
464465/// Push a job into each thread's own "external jobs" queue; it will be
466 /// executed only on that thread, when it has nothing else to do locally,
467 /// before it tries to steal other work.
468 ///
469 /// **Panics** if not given exactly as many jobs as there are threads.
470pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
471match (&self.num_threads(), &injected_jobs.len()) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val, &*right_val,
::core::option::Option::None);
}
}
};assert_eq!(self.num_threads(), injected_jobs.len());
472 {
473let broadcasts = self.broadcasts.lock().unwrap();
474475// It should not be possible for `state.terminate` to be true
476 // here. It is only set to true when the user creates (and
477 // drops) a `ThreadPool`; and, in that case, they cannot be
478 // calling `inject_broadcast()` later, since they dropped their
479 // `ThreadPool`.
480if true {
match (&(self.terminate_count.load(Ordering::Acquire)), &(0)) {
(left_val, right_val) => {
if *left_val == *right_val {
let kind = ::core::panicking::AssertKind::Ne;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val,
::core::option::Option::Some(format_args!("inject_broadcast() sees state.terminate as true")));
}
}
};
};debug_assert_ne!(
481self.terminate_count.load(Ordering::Acquire),
4820,
483"inject_broadcast() sees state.terminate as true"
484);
485486match (&broadcasts.len(), &injected_jobs.len()) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val, &*right_val,
::core::option::Option::None);
}
}
};assert_eq!(broadcasts.len(), injected_jobs.len());
487for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) {
488 worker.push(job_ref);
489 }
490 }
491for i in 0..self.num_threads() {
492self.sleep.notify_worker_latch_is_set(i);
493 }
494 }
495496/// If already in a worker-thread of this registry, just execute `op`.
497 /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
498 /// completes and return its return value. If `op` panics, that panic will
499 /// be propagated as well. The second argument indicates `true` if injection
500 /// was performed, `false` if executed directly.
501pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
502where
503OP: FnOnce(&WorkerThread, bool) -> R + Send,
504 R: Send,
505 {
506unsafe {
507let worker_thread = WorkerThread::current();
508if worker_thread.is_null() {
509self.in_worker_cold(op)
510 } else if (*worker_thread).registry().id() != self.id() {
511self.in_worker_cross(&*worker_thread, op)
512 } else {
513// Perfectly valid to give them a `&T`: this is the
514 // current thread, so we know the data structure won't be
515 // invalidated until we return.
516op(&*worker_thread, false)
517 }
518 }
519 }
520521#[cold]
522unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
523where
524OP: FnOnce(&WorkerThread, bool) -> R + Send,
525 R: Send,
526 {
527const LOCK_LATCH: ::std::thread::LocalKey<LockLatch> =
{
#[inline]
fn __rust_std_internal_init_fn() -> LockLatch { LockLatch::new() }
unsafe {
::std::thread::LocalKey::new(const {
if ::std::mem::needs_drop::<LockLatch>() {
|__rust_std_internal_init|
{
#[thread_local]
static __RUST_STD_INTERNAL_VAL:
::std::thread::local_impl::LazyStorage<LockLatch, ()> =
::std::thread::local_impl::LazyStorage::new();
__RUST_STD_INTERNAL_VAL.get_or_init(__rust_std_internal_init,
__rust_std_internal_init_fn)
}
} else {
|__rust_std_internal_init|
{
#[thread_local]
static __RUST_STD_INTERNAL_VAL:
::std::thread::local_impl::LazyStorage<LockLatch, !> =
::std::thread::local_impl::LazyStorage::new();
__RUST_STD_INTERNAL_VAL.get_or_init(__rust_std_internal_init,
__rust_std_internal_init_fn)
}
}
})
}
};thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
528529LOCK_LATCH.with(|l| {
530// This thread isn't a member of *any* thread pool, so just block.
531if true {
if !WorkerThread::current().is_null() {
::core::panicking::panic("assertion failed: WorkerThread::current().is_null()")
};
};debug_assert!(WorkerThread::current().is_null());
532let job = StackJob::new(
533Tlv::null(),
534 |injected| {
535let worker_thread = WorkerThread::current();
536if !(injected && !worker_thread.is_null()) {
::core::panicking::panic("assertion failed: injected && !worker_thread.is_null()")
};assert!(injected && !worker_thread.is_null());
537op(unsafe { &*worker_thread }, true)
538 },
539LatchRef::new(l),
540 );
541self.inject(unsafe { job.as_job_ref() });
542self.release_thread();
543job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
544self.acquire_thread();
545546unsafe { job.into_result() }
547 })
548 }
549550#[cold]
551unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
552where
553OP: FnOnce(&WorkerThread, bool) -> R + Send,
554 R: Send,
555 {
556// This thread is a member of a different pool, so let it process
557 // other work while waiting for this `op` to complete.
558if true {
if !(current_thread.registry().id() != self.id()) {
::core::panicking::panic("assertion failed: current_thread.registry().id() != self.id()")
};
};debug_assert!(current_thread.registry().id() != self.id());
559let latch = SpinLatch::cross(current_thread);
560let job = StackJob::new(
561Tlv::null(),
562 |injected| {
563let worker_thread = WorkerThread::current();
564if !(injected && !worker_thread.is_null()) {
::core::panicking::panic("assertion failed: injected && !worker_thread.is_null()")
};assert!(injected && !worker_thread.is_null());
565op(unsafe { &*worker_thread }, true)
566 },
567latch,
568 );
569self.inject(unsafe { job.as_job_ref() });
570unsafe { current_thread.wait_until(&job.latch) };
571unsafe { job.into_result() }
572 }
573574/// Increments the terminate counter. This increment should be
575 /// balanced by a call to `terminate`, which will decrement. This
576 /// is used when spawning asynchronous work, which needs to
577 /// prevent the registry from terminating so long as it is active.
578 ///
579 /// Note that blocking functions such as `join` and `scope` do not
580 /// need to concern themselves with this fn; their context is
581 /// responsible for ensuring the current thread-pool will not
582 /// terminate until they return.
583 ///
584 /// The global thread-pool always has an outstanding reference
585 /// (the initial one). Custom thread-pools have one outstanding
586 /// reference that is dropped when the `ThreadPool` is dropped:
587 /// since installing the thread-pool blocks until any joins/scopes
588 /// complete, this ensures that joins/scopes are covered.
589 ///
590 /// The exception is `::spawn()`, which can create a job outside
591 /// of any blocking scope. In that case, the job itself holds a
592 /// terminate count and is responsible for invoking `terminate()`
593 /// when finished.
594pub(super) fn increment_terminate_count(&self) {
595let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
596if true {
if !(previous != 0) {
{
::core::panicking::panic_fmt(format_args!("registry ref count incremented from zero"));
}
};
};debug_assert!(previous != 0, "registry ref count incremented from zero");
597if !(previous != usize::MAX) {
{
::core::panicking::panic_fmt(format_args!("overflow in registry ref count"));
}
};assert!(previous != usize::MAX, "overflow in registry ref count");
598 }
599600/// Signals that the thread-pool which owns this registry has been
601 /// dropped. The worker threads will gradually terminate, once any
602 /// extant work is completed.
603pub(super) fn terminate(&self) {
604if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
605for (i, thread_info) in self.thread_infos.iter().enumerate() {
606unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
607 }
608 }
609 }
610611/// Notify the worker that the latch they are sleeping on has been "set".
612pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
613self.sleep.notify_worker_latch_is_set(target_worker_index);
614 }
615}
616617/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
618/// if no other worker thread is active
619#[inline]
620pub fn mark_blocked() {
621let worker_thread = WorkerThread::current();
622if !!worker_thread.is_null() {
::core::panicking::panic("assertion failed: !worker_thread.is_null()")
};assert!(!worker_thread.is_null());
623unsafe {
624let registry = &(*worker_thread).registry;
625registry.sleep.mark_blocked(®istry.deadlock_handler)
626 }
627}
628629/// Mark a previously blocked Rayon worker thread as unblocked
630#[inline]
631pub fn mark_unblocked(registry: &Registry) {
632registry.sleep.mark_unblocked()
633}
634635#[derive(#[automatically_derived]
impl ::core::marker::Copy for RegistryId { }Copy, #[automatically_derived]
impl ::core::clone::Clone for RegistryId {
#[inline]
fn clone(&self) -> RegistryId {
let _: ::core::clone::AssertParamIsClone<usize>;
*self
}
}Clone, #[automatically_derived]
impl ::core::fmt::Debug for RegistryId {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field1_finish(f, "RegistryId",
"addr", &&self.addr)
}
}Debug, #[automatically_derived]
impl ::core::cmp::PartialEq for RegistryId {
#[inline]
fn eq(&self, other: &RegistryId) -> bool { self.addr == other.addr }
}PartialEq, #[automatically_derived]
impl ::core::cmp::Eq for RegistryId {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) {
let _: ::core::cmp::AssertParamIsEq<usize>;
}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialOrd for RegistryId {
#[inline]
fn partial_cmp(&self, other: &RegistryId)
-> ::core::option::Option<::core::cmp::Ordering> {
::core::cmp::PartialOrd::partial_cmp(&self.addr, &other.addr)
}
}PartialOrd, #[automatically_derived]
impl ::core::cmp::Ord for RegistryId {
#[inline]
fn cmp(&self, other: &RegistryId) -> ::core::cmp::Ordering {
::core::cmp::Ord::cmp(&self.addr, &other.addr)
}
}Ord)]
636pub(super) struct RegistryId {
637 addr: usize,
638}
639640struct ThreadInfo {
641/// Latch set once thread has started and we are entering into the
642 /// main loop. Used to wait for worker threads to become primed,
643 /// primarily of interest for benchmarking.
644primed: LockLatch,
645646/// Latch is set once worker thread has completed. Used to wait
647 /// until workers have stopped; only used for tests.
648stopped: LockLatch,
649650/// The latch used to signal that terminated has been requested.
651 /// This latch is *set* by the `terminate` method on the
652 /// `Registry`, once the registry's main "terminate" counter
653 /// reaches zero.
654terminate: OnceLatch,
655656/// the "stealer" half of the worker's deque
657stealer: Stealer<JobRef>,
658}
659660impl ThreadInfo {
661fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
662ThreadInfo {
663 primed: LockLatch::new(),
664 stopped: LockLatch::new(),
665 terminate: OnceLatch::new(),
666stealer,
667 }
668 }
669}
670671///////////////////////////////////////////////////////////////////////////
672// WorkerThread identifiers
673674pub(super) struct WorkerThread {
675/// the "worker" half of our local deque
676worker: Worker<JobRef>,
677678/// the "stealer" half of the worker's broadcast deque
679stealer: Stealer<JobRef>,
680681/// local queue used for `spawn_fifo` indirection
682fifo: JobFifo,
683684pub(crate) index: usize,
685686/// A weak random number generator.
687rng: XorShift64Star,
688689pub(crate) registry: Arc<Registry>,
690}
691692// This is a bit sketchy, but basically: the WorkerThread is
693// allocated on the stack of the worker on entry and stored into this
694// thread local variable. So it will remain valid at least until the
695// worker is fully unwound. Using an unsafe pointer avoids the need
696// for a RefCell<T> etc.
697const WORKER_THREAD_STATE: ::std::thread::LocalKey<Cell<*const WorkerThread>>
=
{
const __RUST_STD_INTERNAL_INIT: Cell<*const WorkerThread> =
{ Cell::new(ptr::null()) };
unsafe {
::std::thread::LocalKey::new(const {
if ::std::mem::needs_drop::<Cell<*const WorkerThread>>() {
|_|
{
#[thread_local]
static __RUST_STD_INTERNAL_VAL:
::std::thread::local_impl::EagerStorage<Cell<*const WorkerThread>>
=
::std::thread::local_impl::EagerStorage::new(__RUST_STD_INTERNAL_INIT);
__RUST_STD_INTERNAL_VAL.get()
}
} else {
|_|
{
#[thread_local]
static __RUST_STD_INTERNAL_VAL: Cell<*const WorkerThread> =
__RUST_STD_INTERNAL_INIT;
&__RUST_STD_INTERNAL_VAL
}
}
})
}
};thread_local! {
698static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
699}700701impl From<ThreadBuilder> for WorkerThread {
702fn from(thread: ThreadBuilder) -> Self {
703Self {
704 worker: thread.worker,
705 stealer: thread.stealer,
706 fifo: JobFifo::new(),
707 index: thread.index,
708 rng: XorShift64Star::new(),
709 registry: thread.registry,
710 }
711 }
712}
713714impl Dropfor WorkerThread {
715fn drop(&mut self) {
716// Undo `set_current`
717WORKER_THREAD_STATE.with(|t| {
718if !t.get().eq(&(self as *const _)) {
::core::panicking::panic("assertion failed: t.get().eq(&(self as *const _))")
};assert!(t.get().eq(&(self as *const _)));
719t.set(ptr::null());
720 });
721 }
722}
723724impl WorkerThread {
725/// Gets the `WorkerThread` index for the current thread; returns
726 /// NULL if this is not a worker thread. This pointer is valid
727 /// anywhere on the current thread.
728#[inline]
729pub(super) fn current() -> *const WorkerThread {
730WORKER_THREAD_STATE.with(Cell::get)
731 }
732733/// Sets `self` as the worker thread index for the current thread.
734 /// This is done during worker thread startup.
735unsafe fn set_current(thread: *const WorkerThread) {
736WORKER_THREAD_STATE.with(|t| {
737if !t.get().is_null() {
::core::panicking::panic("assertion failed: t.get().is_null()")
};assert!(t.get().is_null());
738t.set(thread);
739 });
740 }
741742/// Returns the registry that owns this worker thread.
743#[inline]
744pub(super) fn registry(&self) -> &Arc<Registry> {
745&self.registry
746 }
747748/// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
749#[inline]
750pub(super) fn index(&self) -> usize {
751self.index
752 }
753754#[inline]
755pub(super) unsafe fn push(&self, job: JobRef) {
756let queue_was_empty = self.worker.is_empty();
757self.worker.push(job);
758self.registry.sleep.new_internal_jobs(1, queue_was_empty);
759 }
760761#[inline]
762pub(super) unsafe fn push_fifo(&self, job: JobRef) {
763unsafe { self.push(self.fifo.push(job)) };
764 }
765766#[inline]
767pub(super) fn local_deque_is_empty(&self) -> bool {
768self.worker.is_empty()
769 }
770771/// Attempts to obtain a "local" job -- typically this means
772 /// popping from the top of the stack, though if we are configured
773 /// for breadth-first execution, it would mean dequeuing from the
774 /// bottom.
775#[inline]
776pub(super) fn take_local_job(&self) -> Option<JobRef> {
777let popped_job = self.worker.pop();
778779if popped_job.is_some() {
780return popped_job;
781 }
782783loop {
784match self.stealer.steal() {
785 Steal::Success(job) => return Some(job),
786 Steal::Empty => return None,
787 Steal::Retry => {}
788 }
789 }
790 }
791792pub(super) fn has_injected_job(&self) -> bool {
793 !self.stealer.is_empty() || self.registry.has_injected_job()
794 }
795796/// Wait until the latch is set. Try to keep busy by popping and
797 /// stealing tasks as necessary.
798#[inline]
799pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
800unsafe { self.wait_or_steal_until(latch, false) };
801 }
802803/// Wait until the latch is set. Executes local jobs if `is_job` is true for them and
804 /// `all_jobs_started` still returns false.
805#[inline]
806pub(super) unsafe fn wait_for_jobs<L: AsCoreLatch + ?Sized, const BROADCAST_JOBS: bool>(
807&self,
808 latch: &L,
809mut all_jobs_started: impl FnMut() -> bool,
810mut is_job: impl FnMut(&JobRef) -> bool,
811mut execute_job: impl FnMut(JobRef),
812 ) {
813let mut jobs = SmallVec::<[JobRef; 8]>::new();
814let mut broadcast_jobs = SmallVec::<[JobRef; 8]>::new();
815816while !all_jobs_started() {
817if let Some(job) = self.worker.pop() {
818if is_job(&job) {
819 execute_job(job);
820 } else {
821 jobs.push(job);
822 }
823 } else {
824if BROADCAST_JOBS {
825let broadcast_job = loop {
826match self.stealer.steal() {
827 Steal::Success(job) => break Some(job),
828 Steal::Empty => break None,
829 Steal::Retry => continue,
830 }
831 };
832if let Some(job) = broadcast_job {
833if is_job(&job) {
834 execute_job(job);
835 } else {
836 broadcast_jobs.push(job);
837 }
838 }
839 }
840break;
841 }
842 }
843844// Restore the jobs that we weren't looking for.
845for job in jobs {
846self.worker.push(job);
847 }
848if BROADCAST_JOBS {
849let broadcasts = self.registry.broadcasts.lock().unwrap();
850for job in broadcast_jobs {
851 broadcasts[self.index].push(job);
852 }
853 }
854855// Wait for the jobs to finish.
856unsafe { self.wait_until(latch) };
857if true {
if !latch.as_core_latch().probe() {
::core::panicking::panic("assertion failed: latch.as_core_latch().probe()")
};
};debug_assert!(latch.as_core_latch().probe());
858 }
859860pub(super) unsafe fn wait_or_steal_until<L: AsCoreLatch + ?Sized>(
861&self,
862 latch: &L,
863 steal: bool,
864 ) {
865let latch = latch.as_core_latch();
866if !latch.probe() {
867if steal {
868unsafe { self.wait_or_steal_until_cold(latch) };
869 } else {
870unsafe { self.wait_until_cold(latch) };
871 }
872 }
873 }
874875#[cold]
876unsafe fn wait_or_steal_until_cold(&self, latch: &CoreLatch) {
877// the code below should swallow all panics and hence never
878 // unwind; but if something does wrong, we want to abort,
879 // because otherwise other code in rayon may assume that the
880 // latch has been signaled, and that can lead to random memory
881 // accesses, which would be *very bad*
882let abort_guard = unwind::AbortIfPanic;
883884'outer: while !latch.probe() {
885// Check for local work *before* we start marking ourself idle,
886 // especially to avoid modifying shared sleep state.
887if let Some(job) = self.take_local_job() {
888unsafe { self.execute(job) };
889continue;
890 }
891892let mut idle_state = self.registry.sleep.start_looking(self.index);
893while !latch.probe() {
894if let Some(job) = self.find_work() {
895self.registry.sleep.work_found();
896unsafe { self.execute(job) };
897// The job might have injected local work, so go back to the outer loop.
898continue 'outer;
899 } else {
900self.registry.sleep.no_work_found(&mut idle_state, latch, &self, true)
901 }
902 }
903904// If we were sleepy, we are not anymore. We "found work" --
905 // whatever the surrounding thread was doing before it had to wait.
906self.registry.sleep.work_found();
907break;
908 }
909910 mem::forget(abort_guard); // successful execution, do not abort
911}
912913#[cold]
914unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
915// the code below should swallow all panics and hence never
916 // unwind; but if something does wrong, we want to abort,
917 // because otherwise other code in rayon may assume that the
918 // latch has been signaled, and that can lead to random memory
919 // accesses, which would be *very bad*
920let abort_guard = unwind::AbortIfPanic;
921922let mut idle_state = self.registry.sleep.start_looking(self.index);
923while !latch.probe() {
924self.registry.sleep.no_work_found(&mut idle_state, latch, &self, false);
925 }
926927// If we were sleepy, we are not anymore. We "found work" --
928 // whatever the surrounding thread was doing before it had to wait.
929self.registry.sleep.work_found();
930931 mem::forget(abort_guard); // successful execution, do not abort
932}
933934unsafe fn wait_until_out_of_work(&self) {
935if true {
match (&(self as *const _), &WorkerThread::current()) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val, ::core::option::Option::None);
}
}
};
};debug_assert_eq!(self as *const _, WorkerThread::current());
936let registry = &*self.registry;
937let index = self.index;
938939registry.acquire_thread();
940unsafe { self.wait_or_steal_until(®istry.thread_infos[index].terminate, true) };
941942// Should not be any work left in our queue.
943if true {
if !self.take_local_job().is_none() {
::core::panicking::panic("assertion failed: self.take_local_job().is_none()")
};
};debug_assert!(self.take_local_job().is_none());
944945// Let registry know we are done
946unsafe { Latch::set(®istry.thread_infos[index].stopped) };
947 }
948949fn find_work(&self) -> Option<JobRef> {
950// Try to find some work to do. We give preference first
951 // to things in our local deque, then in other workers
952 // deques, and finally to injected jobs from the
953 // outside. The idea is to finish what we started before
954 // we take on something new.
955self.take_local_job().or_else(|| self.steal()).or_else(|| self.registry.pop_injected_job())
956 }
957958pub(super) fn yield_now(&self) -> Yield {
959match self.find_work() {
960Some(job) => unsafe {
961self.execute(job);
962 Yield::Executed963 },
964None => Yield::Idle,
965 }
966 }
967968pub(super) fn yield_local(&self) -> Yield {
969match self.take_local_job() {
970Some(job) => unsafe {
971self.execute(job);
972 Yield::Executed973 },
974None => Yield::Idle,
975 }
976 }
977978#[inline]
979pub(super) unsafe fn execute(&self, job: JobRef) {
980unsafe { job.execute() };
981 }
982983/// Try to steal a single job and return it.
984 ///
985 /// This should only be done as a last resort, when there is no
986 /// local work to do.
987fn steal(&self) -> Option<JobRef> {
988// we only steal when we don't have any work to do locally
989if true {
if !self.local_deque_is_empty() {
::core::panicking::panic("assertion failed: self.local_deque_is_empty()")
};
};debug_assert!(self.local_deque_is_empty());
990991// otherwise, try to steal
992let thread_infos = &self.registry.thread_infos.as_slice();
993let num_threads = thread_infos.len();
994if num_threads <= 1 {
995return None;
996 }
997998loop {
999let mut retry = false;
1000let start = self.rng.next_usize(num_threads);
1001let job = (start..num_threads)
1002 .chain(0..start)
1003 .filter(move |&i| i != self.index)
1004 .find_map(|victim_index| {
1005let victim = &thread_infos[victim_index];
1006match victim.stealer.steal() {
1007 Steal::Success(job) => Some(job),
1008 Steal::Empty => None,
1009 Steal::Retry => {
1010retry = true;
1011None1012 }
1013 }
1014 });
1015if job.is_some() || !retry {
1016return job;
1017 }
1018 }
1019 }
1020}
10211022unsafe fn main_loop(thread: ThreadBuilder) {
1023let worker_thread = &WorkerThread::from(thread);
1024unsafe { WorkerThread::set_current(worker_thread) };
1025let registry = &*worker_thread.registry;
1026let index = worker_thread.index;
10271028// let registry know we are ready to do work
1029unsafe { Latch::set(®istry.thread_infos[index].primed) };
10301031// Worker threads should not panic. If they do, just abort, as the
1032 // internal state of the threadpool is corrupted. Note that if
1033 // **user code** panics, we should catch that and redirect.
1034let abort_guard = unwind::AbortIfPanic;
10351036// Inform a user callback that we started a thread.
1037if let Some(ref handler) = registry.start_handler {
1038registry.catch_unwind(|| handler(index));
1039 }
10401041unsafe { worker_thread.wait_until_out_of_work() };
10421043// Normal termination, do not abort.
1044mem::forget(abort_guard);
10451046// Inform a user callback that we exited a thread.
1047if let Some(ref handler) = registry.exit_handler {
1048registry.catch_unwind(|| handler(index));
1049// We're already exiting the thread, there's nothing else to do.
1050}
10511052registry.release_thread();
1053}
10541055/// If already in a worker-thread, just execute `op`. Otherwise,
1056/// execute `op` in the default thread-pool. Either way, block until
1057/// `op` completes and return its return value. If `op` panics, that
1058/// panic will be propagated as well. The second argument indicates
1059/// `true` if injection was performed, `false` if executed directly.
1060pub(super) fn in_worker<OP, R>(op: OP) -> R
1061where
1062OP: FnOnce(&WorkerThread, bool) -> R + Send,
1063 R: Send,
1064{
1065unsafe {
1066let owner_thread = WorkerThread::current();
1067if !owner_thread.is_null() {
1068// Perfectly valid to give them a `&T`: this is the
1069 // current thread, so we know the data structure won't be
1070 // invalidated until we return.
1071op(&*owner_thread, false)
1072 } else {
1073global_registry().in_worker(op)
1074 }
1075 }
1076}
10771078/// [xorshift*] is a fast pseudorandom number generator which will
1079/// even tolerate weak seeding, as long as it's not zero.
1080///
1081/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
1082struct XorShift64Star {
1083 state: Cell<u64>,
1084}
10851086impl XorShift64Star {
1087fn new() -> Self {
1088// Any non-zero seed will do -- this uses the hash of a global counter.
1089let mut seed = 0;
1090while seed == 0 {
1091let mut hasher = DefaultHasher::new();
1092static COUNTER: AtomicUsize = AtomicUsize::new(0);
1093 hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
1094 seed = hasher.finish();
1095 }
10961097XorShift64Star { state: Cell::new(seed) }
1098 }
10991100fn next(&self) -> u64 {
1101let mut x = self.state.get();
1102if true {
match (&x, &0) {
(left_val, right_val) => {
if *left_val == *right_val {
let kind = ::core::panicking::AssertKind::Ne;
::core::panicking::assert_failed(kind, &*left_val,
&*right_val, ::core::option::Option::None);
}
}
};
};debug_assert_ne!(x, 0);
1103x ^= x >> 12;
1104x ^= x << 25;
1105x ^= x >> 27;
1106self.state.set(x);
1107x.wrapping_mul(0x2545_f491_4f6c_dd1d)
1108 }
11091110/// Return a value from `0..n`.
1111fn next_usize(&self, n: usize) -> usize {
1112 (self.next() % nas u64) as usize1113 }
1114}