Skip to main content

cargo/core/compiler/job_queue/
job_state.rs

1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::core::compiler::future_incompat::FutureBreakageItem;
8use crate::core::compiler::locking::LockKey;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11use crate::util::context::WarningHandling;
12use crate::{CargoResult, core::compiler::locking::LockManager};
13
14use super::{Artifact, DiagDedupe, Job, JobId, Message};
15
16/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
17/// necessary to communicate between the main thread and the execution of the job.
18///
19/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
20/// main thread, the `output` field must be set to prevent a deadlock.
21pub struct JobState<'a, 'gctx> {
22    /// Channel back to the main thread to coordinate messages and such.
23    ///
24    /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
25    /// the message queue to prevent a deadlock.
26    messages: Arc<Queue<Message>>,
27
28    /// Normally output is sent to the job queue with backpressure. When the job is fresh
29    /// however we need to immediately display the output to prevent a deadlock as the
30    /// output messages are processed on the same thread as they are sent from. `output`
31    /// defines where to output in this case.
32    ///
33    /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
34    /// be passed between threads. This means that it isn't possible for multiple output messages
35    /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
36    /// interleaving is still prevented as the lock would be held for the whole printing of an
37    /// output message.
38    ///
39    /// [`Shell`]: cargo_util_terminal::Shell
40    /// [`GlobalContext`]: crate::GlobalContext
41    output: Option<&'a DiagDedupe<'gctx>>,
42
43    /// The job id that this state is associated with, used when sending
44    /// messages back to the main thread.
45    id: JobId,
46
47    /// Whether or not we're expected to have a call to `rmeta_produced`. Once
48    /// that method is called this is dynamically set to `false` to prevent
49    /// sending a double message later on.
50    rmeta_required: Cell<bool>,
51
52    /// Manages locks for build units when fine grain locking is enabled.
53    lock_manager: Arc<LockManager>,
54
55    warning_handling: WarningHandling,
56
57    // Historical versions of Cargo made use of the `'a` argument here, so to
58    // leave the door open to future refactorings keep it here.
59    _marker: marker::PhantomData<&'a ()>,
60}
61
62impl<'a, 'gctx> JobState<'a, 'gctx> {
63    pub(super) fn new(
64        id: JobId,
65        messages: Arc<Queue<Message>>,
66        output: Option<&'a DiagDedupe<'gctx>>,
67        rmeta_required: bool,
68        lock_manager: Arc<LockManager>,
69        warning_handling: WarningHandling,
70    ) -> Self {
71        Self {
72            id,
73            messages,
74            output,
75            rmeta_required: Cell::new(rmeta_required),
76            lock_manager,
77            warning_handling,
78            _marker: marker::PhantomData,
79        }
80    }
81
82    pub fn running(&self, cmd: &ProcessBuilder) {
83        self.messages.push(Message::Run(self.id, cmd.to_string()));
84    }
85
86    pub fn stdout(&self, stdout: String) -> CargoResult<()> {
87        if let Some(dedupe) = self.output {
88            writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
89        } else {
90            self.messages.push_bounded(Message::Stdout(stdout));
91        }
92        Ok(())
93    }
94
95    pub fn stderr(&self, stderr: String) -> CargoResult<()> {
96        if let Some(dedupe) = self.output {
97            let mut shell = dedupe.gctx.shell();
98            shell.print_ansi_stderr(stderr.as_bytes())?;
99            shell.err().write_all(b"\n")?;
100        } else {
101            self.messages.push_bounded(Message::Stderr(stderr));
102        }
103        Ok(())
104    }
105
106    /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
107    pub fn emit_diag(
108        &self,
109        level: &str,
110        diag: String,
111        lint: bool,
112        fixable: bool,
113    ) -> CargoResult<()> {
114        if level == "warning" && lint && self.warning_handling == WarningHandling::Allow {
115            tracing::warn!("{diag}");
116        } else if let Some(dedupe) = self.output {
117            let emitted = dedupe.emit_diag(&diag)?;
118            if level == "warning" {
119                self.messages.push(Message::WarningCount {
120                    id: self.id,
121                    lint,
122                    emitted,
123                    fixable,
124                });
125            }
126        } else {
127            self.messages.push_bounded(Message::Diagnostic {
128                id: self.id,
129                level: level.to_string(),
130                diag,
131                lint,
132                fixable,
133            });
134        }
135        Ok(())
136    }
137
138    /// See [`Message::Warning`].
139    pub fn warning(&self, warning: String) {
140        self.messages.push_bounded(Message::Warning {
141            id: self.id,
142            warning,
143        });
144    }
145
146    /// A method used to signal to the coordinator thread that the rmeta file
147    /// for an rlib has been produced. This is only called for some rmeta
148    /// builds when required, and can be called at any time before a job ends.
149    /// This should only be called once because a metadata file can only be
150    /// produced once!
151    pub fn rmeta_produced(&self) {
152        self.rmeta_required.set(false);
153        self.messages
154            .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
155    }
156
157    pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
158        self.lock_manager.lock(lock)
159    }
160
161    pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
162        self.lock_manager.downgrade_to_shared(lock)
163    }
164
165    pub fn on_section_timing_emitted(&self, section: SectionTiming) {
166        self.messages.push(Message::SectionTiming(self.id, section));
167    }
168
169    /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
170    /// sent even if our job panics.
171    pub(super) fn run_to_finish(self, job: Job) {
172        let mut sender = FinishOnDrop {
173            messages: &self.messages,
174            id: self.id,
175            result: None,
176        };
177        sender.result = Some(job.run(&self));
178
179        // If the `rmeta_required` wasn't consumed but it was set
180        // previously, then we either have:
181        //
182        // 1. The `job` didn't do anything because it was "fresh".
183        // 2. The `job` returned an error and didn't reach the point where
184        //    it called `rmeta_produced`.
185        // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
186        //
187        // Ruling out the third, the other two are pretty common for 2
188        // we'll just naturally abort the compilation operation but for 1
189        // we need to make sure that the metadata is flagged as produced so
190        // send a synthetic message here.
191        if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
192            self.messages
193                .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
194        }
195
196        // Use a helper struct with a `Drop` implementation to guarantee
197        // that a `Finish` message is sent even if our job panics. We
198        // shouldn't panic unless there's a bug in Cargo, so we just need
199        // to make sure nothing hangs by accident.
200        struct FinishOnDrop<'a> {
201            messages: &'a Queue<Message>,
202            id: JobId,
203            result: Option<CargoResult<()>>,
204        }
205
206        impl Drop for FinishOnDrop<'_> {
207            fn drop(&mut self) {
208                let result = self
209                    .result
210                    .take()
211                    .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
212                self.messages
213                    .push(Message::Finish(self.id, Artifact::All, result));
214            }
215        }
216    }
217
218    pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
219        self.messages
220            .push(Message::FutureIncompatReport(self.id, report));
221    }
222
223    /// The rustc emitted the list of unused `--extern` args.
224    ///
225    /// This is useful for checking unused dependencies.
226    /// Should only be called once, as the compiler only emits it once per compilation.
227    pub fn unused_externs(&self, unused_externs: Vec<String>) {
228        self.messages
229            .push(Message::UnusedExterns(self.id, unused_externs));
230    }
231}