rustc_data_structures/sync/
worker_local.rs1use std::cell::{Cell, OnceCell};
2use std::num::NonZero;
3use std::ops::Deref;
4use std::ptr;
5use std::sync::Arc;
6
7use parking_lot::Mutex;
8
9use crate::outline;
10use crate::sync::CacheAligned;
11
12#[derive(Clone, Copy, PartialEq)]
15struct RegistryId(*const RegistryData);
16
17impl RegistryId {
18 #[inline(always)]
19 fn verify(self) -> usize {
26 let (id, index) = THREAD_DATA.with(|data| (data.registry_id.get(), data.index.get()));
27
28 if id == self { index } else { outline(|| panic!("Unable to verify registry association")) }
29 }
30}
31
32struct RegistryData {
33 thread_limit: NonZero<usize>,
34 threads: Mutex<usize>,
35}
36
37#[derive(Clone)]
39pub struct Registry(Arc<RegistryData>);
40
41thread_local! {
42 static REGISTRY: OnceCell<Registry> = const { OnceCell::new() };
45}
46
47struct ThreadData {
48 registry_id: Cell<RegistryId>,
49 index: Cell<usize>,
50}
51
52thread_local! {
53 static THREAD_DATA: ThreadData = const { ThreadData {
56 registry_id: Cell::new(RegistryId(ptr::null())),
57 index: Cell::new(0),
58 }};
59}
60
61impl Registry {
62 pub fn new(thread_limit: NonZero<usize>) -> Self {
64 Registry(Arc::new(RegistryData { thread_limit, threads: Mutex::new(0) }))
65 }
66
67 pub fn current() -> Self {
69 REGISTRY.with(|registry| registry.get().cloned().expect("No associated registry"))
70 }
71
72 pub fn register(&self) {
75 let mut threads = self.0.threads.lock();
76 if *threads < self.0.thread_limit.get() {
77 REGISTRY.with(|registry| {
78 if registry.get().is_some() {
79 drop(threads);
80 panic!("Thread already has a registry");
81 }
82 registry.set(self.clone()).ok();
83 THREAD_DATA.with(|data| {
84 data.registry_id.set(self.id());
85 data.index.set(*threads);
86 });
87 *threads += 1;
88 });
89 } else {
90 drop(threads);
91 panic!("Thread limit reached");
92 }
93 }
94
95 fn id(&self) -> RegistryId {
97 RegistryId(&*self.0)
98 }
99}
100
101pub struct WorkerLocal<T> {
105 locals: Box<[CacheAligned<T>]>,
106 registry: Registry,
107}
108
109unsafe impl<T: Send> Sync for WorkerLocal<T> {}
114
115impl<T> WorkerLocal<T> {
116 #[inline]
119 pub fn new<F: FnMut(usize) -> T>(mut initial: F) -> WorkerLocal<T> {
120 let registry = Registry::current();
121 WorkerLocal {
122 locals: (0..registry.0.thread_limit.get()).map(|i| CacheAligned(initial(i))).collect(),
123 registry,
124 }
125 }
126
127 #[inline]
129 pub fn into_inner(self) -> impl Iterator<Item = T> {
130 self.locals.into_vec().into_iter().map(|local| local.0)
131 }
132}
133
134impl<T> Deref for WorkerLocal<T> {
135 type Target = T;
136
137 #[inline(always)]
138 fn deref(&self) -> &T {
139 unsafe { &self.locals.get_unchecked(self.registry.id().verify()).0 }
142 }
143}
144
145impl<T: Default> Default for WorkerLocal<T> {
146 fn default() -> Self {
147 WorkerLocal::new(|_| T::default())
148 }
149}