cargo/core/compiler/job_queue/
job_state.rs

1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::CargoResult;
8use crate::core::compiler::future_incompat::FutureBreakageItem;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11
12use super::{Artifact, DiagDedupe, Job, JobId, Message};
13
14/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
15/// necessary to communicate between the main thread and the execution of the job.
16///
17/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
18/// main thread, the `output` field must be set to prevent a deadlock.
19pub struct JobState<'a, 'gctx> {
20    /// Channel back to the main thread to coordinate messages and such.
21    ///
22    /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
23    /// the message queue to prevent a deadlock.
24    messages: Arc<Queue<Message>>,
25
26    /// Normally output is sent to the job queue with backpressure. When the job is fresh
27    /// however we need to immediately display the output to prevent a deadlock as the
28    /// output messages are processed on the same thread as they are sent from. `output`
29    /// defines where to output in this case.
30    ///
31    /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
32    /// be passed between threads. This means that it isn't possible for multiple output messages
33    /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
34    /// interleaving is still prevented as the lock would be held for the whole printing of an
35    /// output message.
36    ///
37    /// [`Shell`]: crate::core::Shell
38    /// [`GlobalContext`]: crate::GlobalContext
39    output: Option<&'a DiagDedupe<'gctx>>,
40
41    /// The job id that this state is associated with, used when sending
42    /// messages back to the main thread.
43    id: JobId,
44
45    /// Whether or not we're expected to have a call to `rmeta_produced`. Once
46    /// that method is called this is dynamically set to `false` to prevent
47    /// sending a double message later on.
48    rmeta_required: Cell<bool>,
49
50    // Historical versions of Cargo made use of the `'a` argument here, so to
51    // leave the door open to future refactorings keep it here.
52    _marker: marker::PhantomData<&'a ()>,
53}
54
55impl<'a, 'gctx> JobState<'a, 'gctx> {
56    pub(super) fn new(
57        id: JobId,
58        messages: Arc<Queue<Message>>,
59        output: Option<&'a DiagDedupe<'gctx>>,
60        rmeta_required: bool,
61    ) -> Self {
62        Self {
63            id,
64            messages,
65            output,
66            rmeta_required: Cell::new(rmeta_required),
67            _marker: marker::PhantomData,
68        }
69    }
70
71    pub fn running(&self, cmd: &ProcessBuilder) {
72        self.messages.push(Message::Run(self.id, cmd.to_string()));
73    }
74
75    pub fn stdout(&self, stdout: String) -> CargoResult<()> {
76        if let Some(dedupe) = self.output {
77            writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
78        } else {
79            self.messages.push_bounded(Message::Stdout(stdout));
80        }
81        Ok(())
82    }
83
84    pub fn stderr(&self, stderr: String) -> CargoResult<()> {
85        if let Some(dedupe) = self.output {
86            let mut shell = dedupe.gctx.shell();
87            shell.print_ansi_stderr(stderr.as_bytes())?;
88            shell.err().write_all(b"\n")?;
89        } else {
90            self.messages.push_bounded(Message::Stderr(stderr));
91        }
92        Ok(())
93    }
94
95    /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
96    pub fn emit_diag(
97        &self,
98        level: &str,
99        diag: String,
100        lint: bool,
101        fixable: bool,
102    ) -> CargoResult<()> {
103        if let Some(dedupe) = self.output {
104            let emitted = dedupe.emit_diag(&diag)?;
105            if level == "warning" {
106                self.messages.push(Message::WarningCount {
107                    id: self.id,
108                    lint,
109                    emitted,
110                    fixable,
111                });
112            }
113        } else {
114            self.messages.push_bounded(Message::Diagnostic {
115                id: self.id,
116                level: level.to_string(),
117                diag,
118                lint,
119                fixable,
120            });
121        }
122        Ok(())
123    }
124
125    /// See [`Message::Warning`].
126    pub fn warning(&self, warning: String) {
127        self.messages.push_bounded(Message::Warning {
128            id: self.id,
129            warning,
130        });
131    }
132
133    /// A method used to signal to the coordinator thread that the rmeta file
134    /// for an rlib has been produced. This is only called for some rmeta
135    /// builds when required, and can be called at any time before a job ends.
136    /// This should only be called once because a metadata file can only be
137    /// produced once!
138    pub fn rmeta_produced(&self) {
139        self.rmeta_required.set(false);
140        self.messages
141            .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
142    }
143
144    pub fn on_section_timing_emitted(&self, section: SectionTiming) {
145        self.messages.push(Message::SectionTiming(self.id, section));
146    }
147
148    /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
149    /// sent even if our job panics.
150    pub(super) fn run_to_finish(self, job: Job) {
151        let mut sender = FinishOnDrop {
152            messages: &self.messages,
153            id: self.id,
154            result: None,
155        };
156        sender.result = Some(job.run(&self));
157
158        // If the `rmeta_required` wasn't consumed but it was set
159        // previously, then we either have:
160        //
161        // 1. The `job` didn't do anything because it was "fresh".
162        // 2. The `job` returned an error and didn't reach the point where
163        //    it called `rmeta_produced`.
164        // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
165        //
166        // Ruling out the third, the other two are pretty common for 2
167        // we'll just naturally abort the compilation operation but for 1
168        // we need to make sure that the metadata is flagged as produced so
169        // send a synthetic message here.
170        if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
171            self.messages
172                .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
173        }
174
175        // Use a helper struct with a `Drop` implementation to guarantee
176        // that a `Finish` message is sent even if our job panics. We
177        // shouldn't panic unless there's a bug in Cargo, so we just need
178        // to make sure nothing hangs by accident.
179        struct FinishOnDrop<'a> {
180            messages: &'a Queue<Message>,
181            id: JobId,
182            result: Option<CargoResult<()>>,
183        }
184
185        impl Drop for FinishOnDrop<'_> {
186            fn drop(&mut self) {
187                let result = self
188                    .result
189                    .take()
190                    .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
191                self.messages
192                    .push(Message::Finish(self.id, Artifact::All, result));
193            }
194        }
195    }
196
197    pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
198        self.messages
199            .push(Message::FutureIncompatReport(self.id, report));
200    }
201}