cargo/core/compiler/job_queue/
job_state.rs

1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::CargoResult;
8use crate::core::compiler::build_runner::OutputFile;
9use crate::core::compiler::future_incompat::FutureBreakageItem;
10use crate::core::compiler::timings::SectionTiming;
11use crate::util::Queue;
12
13use super::{Artifact, DiagDedupe, Job, JobId, Message};
14
15/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
16/// necessary to communicate between the main thread and the execution of the job.
17///
18/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
19/// main thread, the `output` field must be set to prevent a deadlock.
20pub struct JobState<'a, 'gctx> {
21    /// Channel back to the main thread to coordinate messages and such.
22    ///
23    /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
24    /// the message queue to prevent a deadlock.
25    messages: Arc<Queue<Message>>,
26
27    /// Normally output is sent to the job queue with backpressure. When the job is fresh
28    /// however we need to immediately display the output to prevent a deadlock as the
29    /// output messages are processed on the same thread as they are sent from. `output`
30    /// defines where to output in this case.
31    ///
32    /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
33    /// be passed between threads. This means that it isn't possible for multiple output messages
34    /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
35    /// interleaving is still prevented as the lock would be held for the whole printing of an
36    /// output message.
37    ///
38    /// [`Shell`]: crate::core::Shell
39    /// [`GlobalContext`]: crate::GlobalContext
40    output: Option<&'a DiagDedupe<'gctx>>,
41
42    /// The job id that this state is associated with, used when sending
43    /// messages back to the main thread.
44    id: JobId,
45
46    /// Whether or not we're expected to have a call to `rmeta_produced`. Once
47    /// that method is called this is dynamically set to `false` to prevent
48    /// sending a double message later on.
49    rmeta_required: Cell<bool>,
50
51    // Historical versions of Cargo made use of the `'a` argument here, so to
52    // leave the door open to future refactorings keep it here.
53    _marker: marker::PhantomData<&'a ()>,
54}
55
56impl<'a, 'gctx> JobState<'a, 'gctx> {
57    pub(super) fn new(
58        id: JobId,
59        messages: Arc<Queue<Message>>,
60        output: Option<&'a DiagDedupe<'gctx>>,
61        rmeta_required: bool,
62    ) -> Self {
63        Self {
64            id,
65            messages,
66            output,
67            rmeta_required: Cell::new(rmeta_required),
68            _marker: marker::PhantomData,
69        }
70    }
71
72    pub fn running(&self, cmd: &ProcessBuilder) {
73        self.messages.push(Message::Run(self.id, cmd.to_string()));
74    }
75
76    pub fn build_plan(
77        &self,
78        module_name: String,
79        cmd: ProcessBuilder,
80        filenames: Arc<Vec<OutputFile>>,
81    ) {
82        self.messages
83            .push(Message::BuildPlanMsg(module_name, cmd, filenames));
84    }
85
86    pub fn stdout(&self, stdout: String) -> CargoResult<()> {
87        if let Some(dedupe) = self.output {
88            writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
89        } else {
90            self.messages.push_bounded(Message::Stdout(stdout));
91        }
92        Ok(())
93    }
94
95    pub fn stderr(&self, stderr: String) -> CargoResult<()> {
96        if let Some(dedupe) = self.output {
97            let mut shell = dedupe.gctx.shell();
98            shell.print_ansi_stderr(stderr.as_bytes())?;
99            shell.err().write_all(b"\n")?;
100        } else {
101            self.messages.push_bounded(Message::Stderr(stderr));
102        }
103        Ok(())
104    }
105
106    /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
107    pub fn emit_diag(&self, level: &str, diag: String, fixable: bool) -> CargoResult<()> {
108        if let Some(dedupe) = self.output {
109            let emitted = dedupe.emit_diag(&diag)?;
110            if level == "warning" {
111                self.messages.push(Message::WarningCount {
112                    id: self.id,
113                    emitted,
114                    fixable,
115                });
116            }
117        } else {
118            self.messages.push_bounded(Message::Diagnostic {
119                id: self.id,
120                level: level.to_string(),
121                diag,
122                fixable,
123            });
124        }
125        Ok(())
126    }
127
128    /// See [`Message::Warning`].
129    pub fn warning(&self, warning: String) {
130        self.messages.push_bounded(Message::Warning {
131            id: self.id,
132            warning,
133        });
134    }
135
136    /// A method used to signal to the coordinator thread that the rmeta file
137    /// for an rlib has been produced. This is only called for some rmeta
138    /// builds when required, and can be called at any time before a job ends.
139    /// This should only be called once because a metadata file can only be
140    /// produced once!
141    pub fn rmeta_produced(&self) {
142        self.rmeta_required.set(false);
143        self.messages
144            .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
145    }
146
147    pub fn on_section_timing_emitted(&self, section: SectionTiming) {
148        self.messages.push(Message::SectionTiming(self.id, section));
149    }
150
151    /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
152    /// sent even if our job panics.
153    pub(super) fn run_to_finish(self, job: Job) {
154        let mut sender = FinishOnDrop {
155            messages: &self.messages,
156            id: self.id,
157            result: None,
158        };
159        sender.result = Some(job.run(&self));
160
161        // If the `rmeta_required` wasn't consumed but it was set
162        // previously, then we either have:
163        //
164        // 1. The `job` didn't do anything because it was "fresh".
165        // 2. The `job` returned an error and didn't reach the point where
166        //    it called `rmeta_produced`.
167        // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
168        //
169        // Ruling out the third, the other two are pretty common for 2
170        // we'll just naturally abort the compilation operation but for 1
171        // we need to make sure that the metadata is flagged as produced so
172        // send a synthetic message here.
173        if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
174            self.messages
175                .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
176        }
177
178        // Use a helper struct with a `Drop` implementation to guarantee
179        // that a `Finish` message is sent even if our job panics. We
180        // shouldn't panic unless there's a bug in Cargo, so we just need
181        // to make sure nothing hangs by accident.
182        struct FinishOnDrop<'a> {
183            messages: &'a Queue<Message>,
184            id: JobId,
185            result: Option<CargoResult<()>>,
186        }
187
188        impl Drop for FinishOnDrop<'_> {
189            fn drop(&mut self) {
190                let result = self
191                    .result
192                    .take()
193                    .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
194                self.messages
195                    .push(Message::Finish(self.id, Artifact::All, result));
196            }
197        }
198    }
199
200    pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
201        self.messages
202            .push(Message::FutureIncompatReport(self.id, report));
203    }
204}