cargo/core/compiler/job_queue/
job_state.rs

1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::core::compiler::future_incompat::FutureBreakageItem;
8use crate::core::compiler::locking::LockKey;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11use crate::{CargoResult, core::compiler::locking::LockManager};
12
13use super::{Artifact, DiagDedupe, Job, JobId, Message};
14
15/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
16/// necessary to communicate between the main thread and the execution of the job.
17///
18/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
19/// main thread, the `output` field must be set to prevent a deadlock.
20pub struct JobState<'a, 'gctx> {
21    /// Channel back to the main thread to coordinate messages and such.
22    ///
23    /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
24    /// the message queue to prevent a deadlock.
25    messages: Arc<Queue<Message>>,
26
27    /// Normally output is sent to the job queue with backpressure. When the job is fresh
28    /// however we need to immediately display the output to prevent a deadlock as the
29    /// output messages are processed on the same thread as they are sent from. `output`
30    /// defines where to output in this case.
31    ///
32    /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
33    /// be passed between threads. This means that it isn't possible for multiple output messages
34    /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
35    /// interleaving is still prevented as the lock would be held for the whole printing of an
36    /// output message.
37    ///
38    /// [`Shell`]: crate::core::Shell
39    /// [`GlobalContext`]: crate::GlobalContext
40    output: Option<&'a DiagDedupe<'gctx>>,
41
42    /// The job id that this state is associated with, used when sending
43    /// messages back to the main thread.
44    id: JobId,
45
46    /// Whether or not we're expected to have a call to `rmeta_produced`. Once
47    /// that method is called this is dynamically set to `false` to prevent
48    /// sending a double message later on.
49    rmeta_required: Cell<bool>,
50
51    /// Manages locks for build units when fine grain locking is enabled.
52    lock_manager: Arc<LockManager>,
53
54    // Historical versions of Cargo made use of the `'a` argument here, so to
55    // leave the door open to future refactorings keep it here.
56    _marker: marker::PhantomData<&'a ()>,
57}
58
59impl<'a, 'gctx> JobState<'a, 'gctx> {
60    pub(super) fn new(
61        id: JobId,
62        messages: Arc<Queue<Message>>,
63        output: Option<&'a DiagDedupe<'gctx>>,
64        rmeta_required: bool,
65        lock_manager: Arc<LockManager>,
66    ) -> Self {
67        Self {
68            id,
69            messages,
70            output,
71            rmeta_required: Cell::new(rmeta_required),
72            lock_manager,
73            _marker: marker::PhantomData,
74        }
75    }
76
77    pub fn running(&self, cmd: &ProcessBuilder) {
78        self.messages.push(Message::Run(self.id, cmd.to_string()));
79    }
80
81    pub fn stdout(&self, stdout: String) -> CargoResult<()> {
82        if let Some(dedupe) = self.output {
83            writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
84        } else {
85            self.messages.push_bounded(Message::Stdout(stdout));
86        }
87        Ok(())
88    }
89
90    pub fn stderr(&self, stderr: String) -> CargoResult<()> {
91        if let Some(dedupe) = self.output {
92            let mut shell = dedupe.gctx.shell();
93            shell.print_ansi_stderr(stderr.as_bytes())?;
94            shell.err().write_all(b"\n")?;
95        } else {
96            self.messages.push_bounded(Message::Stderr(stderr));
97        }
98        Ok(())
99    }
100
101    /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
102    pub fn emit_diag(
103        &self,
104        level: &str,
105        diag: String,
106        lint: bool,
107        fixable: bool,
108    ) -> CargoResult<()> {
109        if let Some(dedupe) = self.output {
110            let emitted = dedupe.emit_diag(&diag)?;
111            if level == "warning" {
112                self.messages.push(Message::WarningCount {
113                    id: self.id,
114                    lint,
115                    emitted,
116                    fixable,
117                });
118            }
119        } else {
120            self.messages.push_bounded(Message::Diagnostic {
121                id: self.id,
122                level: level.to_string(),
123                diag,
124                lint,
125                fixable,
126            });
127        }
128        Ok(())
129    }
130
131    /// See [`Message::Warning`].
132    pub fn warning(&self, warning: String) {
133        self.messages.push_bounded(Message::Warning {
134            id: self.id,
135            warning,
136        });
137    }
138
139    /// A method used to signal to the coordinator thread that the rmeta file
140    /// for an rlib has been produced. This is only called for some rmeta
141    /// builds when required, and can be called at any time before a job ends.
142    /// This should only be called once because a metadata file can only be
143    /// produced once!
144    pub fn rmeta_produced(&self) {
145        self.rmeta_required.set(false);
146        self.messages
147            .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
148    }
149
150    pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
151        self.lock_manager.lock(lock)
152    }
153
154    pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
155        self.lock_manager.downgrade_to_shared(lock)
156    }
157
158    pub fn on_section_timing_emitted(&self, section: SectionTiming) {
159        self.messages.push(Message::SectionTiming(self.id, section));
160    }
161
162    /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
163    /// sent even if our job panics.
164    pub(super) fn run_to_finish(self, job: Job) {
165        let mut sender = FinishOnDrop {
166            messages: &self.messages,
167            id: self.id,
168            result: None,
169        };
170        sender.result = Some(job.run(&self));
171
172        // If the `rmeta_required` wasn't consumed but it was set
173        // previously, then we either have:
174        //
175        // 1. The `job` didn't do anything because it was "fresh".
176        // 2. The `job` returned an error and didn't reach the point where
177        //    it called `rmeta_produced`.
178        // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
179        //
180        // Ruling out the third, the other two are pretty common for 2
181        // we'll just naturally abort the compilation operation but for 1
182        // we need to make sure that the metadata is flagged as produced so
183        // send a synthetic message here.
184        if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
185            self.messages
186                .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
187        }
188
189        // Use a helper struct with a `Drop` implementation to guarantee
190        // that a `Finish` message is sent even if our job panics. We
191        // shouldn't panic unless there's a bug in Cargo, so we just need
192        // to make sure nothing hangs by accident.
193        struct FinishOnDrop<'a> {
194            messages: &'a Queue<Message>,
195            id: JobId,
196            result: Option<CargoResult<()>>,
197        }
198
199        impl Drop for FinishOnDrop<'_> {
200            fn drop(&mut self) {
201                let result = self
202                    .result
203                    .take()
204                    .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
205                self.messages
206                    .push(Message::Finish(self.id, Artifact::All, result));
207            }
208        }
209    }
210
211    pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
212        self.messages
213            .push(Message::FutureIncompatReport(self.id, report));
214    }
215}