cargo/core/compiler/job_queue/job_state.rs
1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::core::compiler::future_incompat::FutureBreakageItem;
8use crate::core::compiler::locking::LockKey;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11use crate::util::context::WarningHandling;
12use crate::{CargoResult, core::compiler::locking::LockManager};
13
14use super::{Artifact, DiagDedupe, Job, JobId, Message};
15
16/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
17/// necessary to communicate between the main thread and the execution of the job.
18///
19/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
20/// main thread, the `output` field must be set to prevent a deadlock.
21pub struct JobState<'a, 'gctx> {
22 /// Channel back to the main thread to coordinate messages and such.
23 ///
24 /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
25 /// the message queue to prevent a deadlock.
26 messages: Arc<Queue<Message>>,
27
28 /// Normally output is sent to the job queue with backpressure. When the job is fresh
29 /// however we need to immediately display the output to prevent a deadlock as the
30 /// output messages are processed on the same thread as they are sent from. `output`
31 /// defines where to output in this case.
32 ///
33 /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
34 /// be passed between threads. This means that it isn't possible for multiple output messages
35 /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
36 /// interleaving is still prevented as the lock would be held for the whole printing of an
37 /// output message.
38 ///
39 /// [`Shell`]: cargo_util_terminal::Shell
40 /// [`GlobalContext`]: crate::GlobalContext
41 output: Option<&'a DiagDedupe<'gctx>>,
42
43 /// The job id that this state is associated with, used when sending
44 /// messages back to the main thread.
45 id: JobId,
46
47 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
48 /// that method is called this is dynamically set to `false` to prevent
49 /// sending a double message later on.
50 rmeta_required: Cell<bool>,
51
52 /// Manages locks for build units when fine grain locking is enabled.
53 lock_manager: Arc<LockManager>,
54
55 warning_handling: WarningHandling,
56
57 // Historical versions of Cargo made use of the `'a` argument here, so to
58 // leave the door open to future refactorings keep it here.
59 _marker: marker::PhantomData<&'a ()>,
60}
61
62impl<'a, 'gctx> JobState<'a, 'gctx> {
63 pub(super) fn new(
64 id: JobId,
65 messages: Arc<Queue<Message>>,
66 output: Option<&'a DiagDedupe<'gctx>>,
67 rmeta_required: bool,
68 lock_manager: Arc<LockManager>,
69 warning_handling: WarningHandling,
70 ) -> Self {
71 Self {
72 id,
73 messages,
74 output,
75 rmeta_required: Cell::new(rmeta_required),
76 lock_manager,
77 warning_handling,
78 _marker: marker::PhantomData,
79 }
80 }
81
82 pub fn running(&self, cmd: &ProcessBuilder) {
83 self.messages.push(Message::Run(self.id, cmd.to_string()));
84 }
85
86 pub fn stdout(&self, stdout: String) -> CargoResult<()> {
87 if let Some(dedupe) = self.output {
88 writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
89 } else {
90 self.messages.push_bounded(Message::Stdout(stdout));
91 }
92 Ok(())
93 }
94
95 pub fn stderr(&self, stderr: String) -> CargoResult<()> {
96 if let Some(dedupe) = self.output {
97 let mut shell = dedupe.gctx.shell();
98 shell.print_ansi_stderr(stderr.as_bytes())?;
99 shell.err().write_all(b"\n")?;
100 } else {
101 self.messages.push_bounded(Message::Stderr(stderr));
102 }
103 Ok(())
104 }
105
106 /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
107 pub fn emit_diag(
108 &self,
109 level: &str,
110 diag: String,
111 lint: bool,
112 fixable: bool,
113 ) -> CargoResult<()> {
114 if level == "warning" && lint && self.warning_handling == WarningHandling::Allow {
115 tracing::warn!("{diag}");
116 } else if let Some(dedupe) = self.output {
117 let emitted = dedupe.emit_diag(&diag)?;
118 if level == "warning" {
119 self.messages.push(Message::WarningCount {
120 id: self.id,
121 lint,
122 emitted,
123 fixable,
124 });
125 }
126 } else {
127 self.messages.push_bounded(Message::Diagnostic {
128 id: self.id,
129 level: level.to_string(),
130 diag,
131 lint,
132 fixable,
133 });
134 }
135 Ok(())
136 }
137
138 /// See [`Message::Warning`].
139 pub fn warning(&self, warning: String) {
140 self.messages.push_bounded(Message::Warning {
141 id: self.id,
142 warning,
143 });
144 }
145
146 /// A method used to signal to the coordinator thread that the rmeta file
147 /// for an rlib has been produced. This is only called for some rmeta
148 /// builds when required, and can be called at any time before a job ends.
149 /// This should only be called once because a metadata file can only be
150 /// produced once!
151 pub fn rmeta_produced(&self) {
152 self.rmeta_required.set(false);
153 self.messages
154 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
155 }
156
157 pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
158 self.lock_manager.lock(lock)
159 }
160
161 pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
162 self.lock_manager.downgrade_to_shared(lock)
163 }
164
165 pub fn on_section_timing_emitted(&self, section: SectionTiming) {
166 self.messages.push(Message::SectionTiming(self.id, section));
167 }
168
169 /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
170 /// sent even if our job panics.
171 pub(super) fn run_to_finish(self, job: Job) {
172 let mut sender = FinishOnDrop {
173 messages: &self.messages,
174 id: self.id,
175 result: None,
176 };
177 sender.result = Some(job.run(&self));
178
179 // If the `rmeta_required` wasn't consumed but it was set
180 // previously, then we either have:
181 //
182 // 1. The `job` didn't do anything because it was "fresh".
183 // 2. The `job` returned an error and didn't reach the point where
184 // it called `rmeta_produced`.
185 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
186 //
187 // Ruling out the third, the other two are pretty common for 2
188 // we'll just naturally abort the compilation operation but for 1
189 // we need to make sure that the metadata is flagged as produced so
190 // send a synthetic message here.
191 if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
192 self.messages
193 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
194 }
195
196 // Use a helper struct with a `Drop` implementation to guarantee
197 // that a `Finish` message is sent even if our job panics. We
198 // shouldn't panic unless there's a bug in Cargo, so we just need
199 // to make sure nothing hangs by accident.
200 struct FinishOnDrop<'a> {
201 messages: &'a Queue<Message>,
202 id: JobId,
203 result: Option<CargoResult<()>>,
204 }
205
206 impl Drop for FinishOnDrop<'_> {
207 fn drop(&mut self) {
208 let result = self
209 .result
210 .take()
211 .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
212 self.messages
213 .push(Message::Finish(self.id, Artifact::All, result));
214 }
215 }
216 }
217
218 pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
219 self.messages
220 .push(Message::FutureIncompatReport(self.id, report));
221 }
222
223 /// The rustc emitted the list of unused `--extern` args.
224 ///
225 /// This is useful for checking unused dependencies.
226 /// Should only be called once, as the compiler only emits it once per compilation.
227 pub fn unused_externs(&self, unused_externs: Vec<String>) {
228 self.messages
229 .push(Message::UnusedExterns(self.id, unused_externs));
230 }
231}