1use std::sync::{Arc, LazyLock, OnceLock};
23pub use jobserver_crate::{Acquired, Client, HelperThread};
4use jobserver_crate::{FromEnv, FromEnvErrorKind};
5use parking_lot::{Condvar, Mutex};
67// We can only call `from_env_ext` once per process
89// We stick this in a global because there could be multiple rustc instances
10// in this process, and the jobserver is per-process.
11static GLOBAL_CLIENT: LazyLock<Result<Client, String>> = LazyLock::new(|| {
12// Note that this is unsafe because it may misinterpret file descriptors
13 // on Unix as jobserver file descriptors. We hopefully execute this near
14 // the beginning of the process though to ensure we don't get false
15 // positives, or in other words we try to execute this before we open
16 // any file descriptors ourselves.
17let FromEnv { client, var } = unsafe { Client::from_env_ext(true) };
1819let error = match client {
20Ok(client) => return Ok(client),
21Err(e) => e,
22 };
2324if #[allow(non_exhaustive_omitted_patterns)] match error.kind() {
FromEnvErrorKind::NoEnvVar | FromEnvErrorKind::NoJobserver |
FromEnvErrorKind::NegativeFd | FromEnvErrorKind::Unsupported => true,
_ => false,
}matches!(
25 error.kind(),
26 FromEnvErrorKind::NoEnvVar
27 | FromEnvErrorKind::NoJobserver
28 | FromEnvErrorKind::NegativeFd
29 | FromEnvErrorKind::Unsupported
30 ) {
31return Ok(default_client());
32 }
3334// Environment specifies jobserver, but it looks incorrect.
35 // Safety: `error.kind()` should be `NoEnvVar` if `var == None`.
36let (name, value) = var.unwrap();
37Err(::alloc::__export::must_use({
::alloc::fmt::format(format_args!("failed to connect to jobserver from environment variable `{1}={0:?}`: {2}",
value, name, error))
})format!(
38"failed to connect to jobserver from environment variable `{name}={:?}`: {error}",
39 value
40 ))
41});
4243// Create a new jobserver if there's no inherited one.
44fn default_client() -> Client {
45// Pick a "reasonable maximum" capping out at 32
46 // so we don't take everything down by hogging the process run queue.
47 // The fixed number is used to have deterministic compilation across machines.
48let client = Client::new(32).expect("failed to create jobserver");
4950// Acquire a token for the main thread which we can release later
51client.acquire_raw().ok();
5253client54}
5556static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
5758pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
59let client_checked = match &*GLOBAL_CLIENT {
60Ok(client) => client.clone(),
61Err(e) => {
62report_warning(e);
63default_client()
64 }
65 };
66GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
67}
6869const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
7071pub fn client() -> Client {
72GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
73}
7475struct ProxyData {
76/// The number of tokens assigned to threads.
77 /// If this is 0, a single token is still assigned to this process, but is unused.
78used: u16,
7980/// The number of threads requesting a token
81pending: u16,
82}
8384/// This is a jobserver proxy used to ensure that we hold on to at least one token.
85pub struct Proxy {
86 client: Client,
87 data: Mutex<ProxyData>,
8889/// Threads which are waiting on a token will wait on this.
90wake_pending: Condvar,
9192 helper: OnceLock<HelperThread>,
93}
9495impl Proxy {
96pub fn new() -> Arc<Self> {
97let proxy = Arc::new(Proxy {
98 client: client(),
99 data: Mutex::new(ProxyData { used: 1, pending: 0 }),
100 wake_pending: Condvar::new(),
101 helper: OnceLock::new(),
102 });
103let proxy_ = Arc::clone(&proxy);
104let helper = proxy105 .client
106 .clone()
107 .into_helper_thread(move |token| {
108if let Ok(token) = token {
109let mut data = proxy_.data.lock();
110if data.pending > 0 {
111// Give the token to a waiting thread
112token.drop_without_releasing();
113if !(data.used > 0) {
::core::panicking::panic("assertion failed: data.used > 0")
};assert!(data.used > 0);
114data.used += 1;
115data.pending -= 1;
116proxy_.wake_pending.notify_one();
117 } else {
118// The token is no longer needed, drop it.
119drop(data);
120drop(token);
121 }
122 }
123 })
124 .expect("failed to create helper thread");
125proxy.helper.set(helper).unwrap();
126proxy127 }
128129pub fn acquire_thread(&self) {
130let mut data = self.data.lock();
131132if data.used == 0 {
133// There was a free token around. This can
134 // happen when all threads release their token.
135match (&data.pending, &0) {
(left_val, right_val) => {
if !(*left_val == *right_val) {
let kind = ::core::panicking::AssertKind::Eq;
::core::panicking::assert_failed(kind, &*left_val, &*right_val,
::core::option::Option::None);
}
}
};assert_eq!(data.pending, 0);
136data.used += 1;
137 } else {
138// Request a token from the helper thread. We can't directly use `acquire_raw`
139 // as we also need to be able to wait for the final token in the process which
140 // does not get a corresponding `release_raw` call.
141self.helper.get().unwrap().request_token();
142data.pending += 1;
143self.wake_pending.wait(&mut data);
144 }
145 }
146147pub fn release_thread(&self) {
148let mut data = self.data.lock();
149150if data.pending > 0 {
151// Give the token to a waiting thread
152data.pending -= 1;
153self.wake_pending.notify_one();
154 } else {
155data.used -= 1;
156157// Release the token unless it's the last one in the process
158if data.used > 0 {
159drop(data);
160self.client.release_raw().ok();
161 }
162 }
163 }
164}