cargo/core/compiler/job_queue/job_state.rs
1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::core::compiler::future_incompat::FutureBreakageItem;
8use crate::core::compiler::locking::LockKey;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11use crate::{CargoResult, core::compiler::locking::LockManager};
12
13use super::{Artifact, DiagDedupe, Job, JobId, Message};
14
15/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
16/// necessary to communicate between the main thread and the execution of the job.
17///
18/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
19/// main thread, the `output` field must be set to prevent a deadlock.
20pub struct JobState<'a, 'gctx> {
21 /// Channel back to the main thread to coordinate messages and such.
22 ///
23 /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
24 /// the message queue to prevent a deadlock.
25 messages: Arc<Queue<Message>>,
26
27 /// Normally output is sent to the job queue with backpressure. When the job is fresh
28 /// however we need to immediately display the output to prevent a deadlock as the
29 /// output messages are processed on the same thread as they are sent from. `output`
30 /// defines where to output in this case.
31 ///
32 /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
33 /// be passed between threads. This means that it isn't possible for multiple output messages
34 /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
35 /// interleaving is still prevented as the lock would be held for the whole printing of an
36 /// output message.
37 ///
38 /// [`Shell`]: crate::core::Shell
39 /// [`GlobalContext`]: crate::GlobalContext
40 output: Option<&'a DiagDedupe<'gctx>>,
41
42 /// The job id that this state is associated with, used when sending
43 /// messages back to the main thread.
44 id: JobId,
45
46 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
47 /// that method is called this is dynamically set to `false` to prevent
48 /// sending a double message later on.
49 rmeta_required: Cell<bool>,
50
51 /// Manages locks for build units when fine grain locking is enabled.
52 lock_manager: Arc<LockManager>,
53
54 // Historical versions of Cargo made use of the `'a` argument here, so to
55 // leave the door open to future refactorings keep it here.
56 _marker: marker::PhantomData<&'a ()>,
57}
58
59impl<'a, 'gctx> JobState<'a, 'gctx> {
60 pub(super) fn new(
61 id: JobId,
62 messages: Arc<Queue<Message>>,
63 output: Option<&'a DiagDedupe<'gctx>>,
64 rmeta_required: bool,
65 lock_manager: Arc<LockManager>,
66 ) -> Self {
67 Self {
68 id,
69 messages,
70 output,
71 rmeta_required: Cell::new(rmeta_required),
72 lock_manager,
73 _marker: marker::PhantomData,
74 }
75 }
76
77 pub fn running(&self, cmd: &ProcessBuilder) {
78 self.messages.push(Message::Run(self.id, cmd.to_string()));
79 }
80
81 pub fn stdout(&self, stdout: String) -> CargoResult<()> {
82 if let Some(dedupe) = self.output {
83 writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
84 } else {
85 self.messages.push_bounded(Message::Stdout(stdout));
86 }
87 Ok(())
88 }
89
90 pub fn stderr(&self, stderr: String) -> CargoResult<()> {
91 if let Some(dedupe) = self.output {
92 let mut shell = dedupe.gctx.shell();
93 shell.print_ansi_stderr(stderr.as_bytes())?;
94 shell.err().write_all(b"\n")?;
95 } else {
96 self.messages.push_bounded(Message::Stderr(stderr));
97 }
98 Ok(())
99 }
100
101 /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
102 pub fn emit_diag(
103 &self,
104 level: &str,
105 diag: String,
106 lint: bool,
107 fixable: bool,
108 ) -> CargoResult<()> {
109 if let Some(dedupe) = self.output {
110 let emitted = dedupe.emit_diag(&diag)?;
111 if level == "warning" {
112 self.messages.push(Message::WarningCount {
113 id: self.id,
114 lint,
115 emitted,
116 fixable,
117 });
118 }
119 } else {
120 self.messages.push_bounded(Message::Diagnostic {
121 id: self.id,
122 level: level.to_string(),
123 diag,
124 lint,
125 fixable,
126 });
127 }
128 Ok(())
129 }
130
131 /// See [`Message::Warning`].
132 pub fn warning(&self, warning: String) {
133 self.messages.push_bounded(Message::Warning {
134 id: self.id,
135 warning,
136 });
137 }
138
139 /// A method used to signal to the coordinator thread that the rmeta file
140 /// for an rlib has been produced. This is only called for some rmeta
141 /// builds when required, and can be called at any time before a job ends.
142 /// This should only be called once because a metadata file can only be
143 /// produced once!
144 pub fn rmeta_produced(&self) {
145 self.rmeta_required.set(false);
146 self.messages
147 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
148 }
149
150 pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
151 self.lock_manager.lock(lock)
152 }
153
154 pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
155 self.lock_manager.downgrade_to_shared(lock)
156 }
157
158 pub fn on_section_timing_emitted(&self, section: SectionTiming) {
159 self.messages.push(Message::SectionTiming(self.id, section));
160 }
161
162 /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
163 /// sent even if our job panics.
164 pub(super) fn run_to_finish(self, job: Job) {
165 let mut sender = FinishOnDrop {
166 messages: &self.messages,
167 id: self.id,
168 result: None,
169 };
170 sender.result = Some(job.run(&self));
171
172 // If the `rmeta_required` wasn't consumed but it was set
173 // previously, then we either have:
174 //
175 // 1. The `job` didn't do anything because it was "fresh".
176 // 2. The `job` returned an error and didn't reach the point where
177 // it called `rmeta_produced`.
178 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
179 //
180 // Ruling out the third, the other two are pretty common for 2
181 // we'll just naturally abort the compilation operation but for 1
182 // we need to make sure that the metadata is flagged as produced so
183 // send a synthetic message here.
184 if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
185 self.messages
186 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
187 }
188
189 // Use a helper struct with a `Drop` implementation to guarantee
190 // that a `Finish` message is sent even if our job panics. We
191 // shouldn't panic unless there's a bug in Cargo, so we just need
192 // to make sure nothing hangs by accident.
193 struct FinishOnDrop<'a> {
194 messages: &'a Queue<Message>,
195 id: JobId,
196 result: Option<CargoResult<()>>,
197 }
198
199 impl Drop for FinishOnDrop<'_> {
200 fn drop(&mut self) {
201 let result = self
202 .result
203 .take()
204 .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
205 self.messages
206 .push(Message::Finish(self.id, Artifact::All, result));
207 }
208 }
209 }
210
211 pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
212 self.messages
213 .push(Message::FutureIncompatReport(self.id, report));
214 }
215}