cargo/core/compiler/job_queue/job_state.rs
1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::CargoResult;
8use crate::core::compiler::future_incompat::FutureBreakageItem;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11
12use super::{Artifact, DiagDedupe, Job, JobId, Message};
13
14/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
15/// necessary to communicate between the main thread and the execution of the job.
16///
17/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
18/// main thread, the `output` field must be set to prevent a deadlock.
19pub struct JobState<'a, 'gctx> {
20 /// Channel back to the main thread to coordinate messages and such.
21 ///
22 /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
23 /// the message queue to prevent a deadlock.
24 messages: Arc<Queue<Message>>,
25
26 /// Normally output is sent to the job queue with backpressure. When the job is fresh
27 /// however we need to immediately display the output to prevent a deadlock as the
28 /// output messages are processed on the same thread as they are sent from. `output`
29 /// defines where to output in this case.
30 ///
31 /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
32 /// be passed between threads. This means that it isn't possible for multiple output messages
33 /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
34 /// interleaving is still prevented as the lock would be held for the whole printing of an
35 /// output message.
36 ///
37 /// [`Shell`]: crate::core::Shell
38 /// [`GlobalContext`]: crate::GlobalContext
39 output: Option<&'a DiagDedupe<'gctx>>,
40
41 /// The job id that this state is associated with, used when sending
42 /// messages back to the main thread.
43 id: JobId,
44
45 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
46 /// that method is called this is dynamically set to `false` to prevent
47 /// sending a double message later on.
48 rmeta_required: Cell<bool>,
49
50 // Historical versions of Cargo made use of the `'a` argument here, so to
51 // leave the door open to future refactorings keep it here.
52 _marker: marker::PhantomData<&'a ()>,
53}
54
55impl<'a, 'gctx> JobState<'a, 'gctx> {
56 pub(super) fn new(
57 id: JobId,
58 messages: Arc<Queue<Message>>,
59 output: Option<&'a DiagDedupe<'gctx>>,
60 rmeta_required: bool,
61 ) -> Self {
62 Self {
63 id,
64 messages,
65 output,
66 rmeta_required: Cell::new(rmeta_required),
67 _marker: marker::PhantomData,
68 }
69 }
70
71 pub fn running(&self, cmd: &ProcessBuilder) {
72 self.messages.push(Message::Run(self.id, cmd.to_string()));
73 }
74
75 pub fn stdout(&self, stdout: String) -> CargoResult<()> {
76 if let Some(dedupe) = self.output {
77 writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
78 } else {
79 self.messages.push_bounded(Message::Stdout(stdout));
80 }
81 Ok(())
82 }
83
84 pub fn stderr(&self, stderr: String) -> CargoResult<()> {
85 if let Some(dedupe) = self.output {
86 let mut shell = dedupe.gctx.shell();
87 shell.print_ansi_stderr(stderr.as_bytes())?;
88 shell.err().write_all(b"\n")?;
89 } else {
90 self.messages.push_bounded(Message::Stderr(stderr));
91 }
92 Ok(())
93 }
94
95 /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
96 pub fn emit_diag(
97 &self,
98 level: &str,
99 diag: String,
100 lint: bool,
101 fixable: bool,
102 ) -> CargoResult<()> {
103 if let Some(dedupe) = self.output {
104 let emitted = dedupe.emit_diag(&diag)?;
105 if level == "warning" {
106 self.messages.push(Message::WarningCount {
107 id: self.id,
108 lint,
109 emitted,
110 fixable,
111 });
112 }
113 } else {
114 self.messages.push_bounded(Message::Diagnostic {
115 id: self.id,
116 level: level.to_string(),
117 diag,
118 lint,
119 fixable,
120 });
121 }
122 Ok(())
123 }
124
125 /// See [`Message::Warning`].
126 pub fn warning(&self, warning: String) {
127 self.messages.push_bounded(Message::Warning {
128 id: self.id,
129 warning,
130 });
131 }
132
133 /// A method used to signal to the coordinator thread that the rmeta file
134 /// for an rlib has been produced. This is only called for some rmeta
135 /// builds when required, and can be called at any time before a job ends.
136 /// This should only be called once because a metadata file can only be
137 /// produced once!
138 pub fn rmeta_produced(&self) {
139 self.rmeta_required.set(false);
140 self.messages
141 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
142 }
143
144 pub fn on_section_timing_emitted(&self, section: SectionTiming) {
145 self.messages.push(Message::SectionTiming(self.id, section));
146 }
147
148 /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
149 /// sent even if our job panics.
150 pub(super) fn run_to_finish(self, job: Job) {
151 let mut sender = FinishOnDrop {
152 messages: &self.messages,
153 id: self.id,
154 result: None,
155 };
156 sender.result = Some(job.run(&self));
157
158 // If the `rmeta_required` wasn't consumed but it was set
159 // previously, then we either have:
160 //
161 // 1. The `job` didn't do anything because it was "fresh".
162 // 2. The `job` returned an error and didn't reach the point where
163 // it called `rmeta_produced`.
164 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
165 //
166 // Ruling out the third, the other two are pretty common for 2
167 // we'll just naturally abort the compilation operation but for 1
168 // we need to make sure that the metadata is flagged as produced so
169 // send a synthetic message here.
170 if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
171 self.messages
172 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
173 }
174
175 // Use a helper struct with a `Drop` implementation to guarantee
176 // that a `Finish` message is sent even if our job panics. We
177 // shouldn't panic unless there's a bug in Cargo, so we just need
178 // to make sure nothing hangs by accident.
179 struct FinishOnDrop<'a> {
180 messages: &'a Queue<Message>,
181 id: JobId,
182 result: Option<CargoResult<()>>,
183 }
184
185 impl Drop for FinishOnDrop<'_> {
186 fn drop(&mut self) {
187 let result = self
188 .result
189 .take()
190 .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
191 self.messages
192 .push(Message::Finish(self.id, Artifact::All, result));
193 }
194 }
195 }
196
197 pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
198 self.messages
199 .push(Message::FutureIncompatReport(self.id, report));
200 }
201}