cargo/core/compiler/job_queue/
mod.rs

1//! Management of the interaction between the main `cargo` and all spawned jobs.
2//!
3//! ## Overview
4//!
5//! This module implements a job queue. A job here represents a unit of work,
6//! which is roughly a rustc invocation, a build script run, or just a no-op.
7//! The job queue primarily handles the following things:
8//!
9//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be
10//!     either executed on a spawned thread or ran on the same thread to avoid
11//!     the threading overhead.
12//! * Controls the number of concurrency. It allocates and manages [`jobserver`]
13//!     tokens to each spawned off rustc and build scripts.
14//! * Manages the communication between the main `cargo` process and its
15//!     spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared
16//!     across threads.
17//! * Schedules the execution order of each [`Job`]. Priorities are determined
18//!     when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is
19//!     relatively rudimentary and could likely be improved.
20//!
21//! A rough outline of building a queue and executing jobs is:
22//!
23//! 1. [`JobQueue::new`] to simply create one queue.
24//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue.
25//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`].
26//!
27//! The primary loop happens insides [`JobQueue::execute`], which is effectively
28//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells,
29//! the running state of the job queue getting drained.
30//!
31//! ## Jobserver
32//!
33//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
34//! relationship with each other. They share a single jobserver amongst what
35//! is potentially hundreds of threads of work on many-cored systems.
36//! The jobserver could come from either the environment (e.g., from a `make`
37//! invocation), or from Cargo creating its own jobserver server if there is no
38//! jobserver to inherit from.
39//!
40//! Cargo wants to complete the build as quickly as possible, fully saturating
41//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
42//! more than N threads of work: the total amount of tokens we have floating
43//! around must always be limited to N.
44//!
45//! It is not really possible to optimally choose which crate should build
46//! first or last; nor is it possible to decide whether to give an additional
47//! token to rustc first or rather spawn a new crate of work. The algorithm in
48//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
49//! possible. In short, the jobserver relationship among Cargo and rustc
50//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
51//! processes in terms of parallelism[^parallel-rustc].
52//!
53//! We integrate with the [jobserver] crate, originating from GNU make
54//! [POSIX jobserver], to make sure that build scripts which use make to
55//! build C code can cooperate with us on the number of used tokens and
56//! avoid overfilling the system we're on.
57//!
58//! ## Scheduling
59//!
60//! The current scheduling algorithm is not really polished. It is simply based
61//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto
62//! the graph until we finalize it. When the graph gets finalized, it finds the
63//! sum of the cost of each dependencies of each node, including transitively.
64//! The sum of dependency cost turns out to be the cost of each given node.
65//!
66//! At the time being, the cost is just passed as a fixed placeholder in
67//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities
68//! around it. For instance, we start persisting timing information for each
69//! build somewhere. For a subsequent build, we can look into the historical
70//! data and perform a PGO-like optimization to prioritize jobs, making a build
71//! fully pipelined.
72//!
73//! ## Message queue
74//!
75//! Each spawned thread running a process uses the message queue [`Queue`] to
76//! send messages back to the main thread (the one running `cargo`).
77//! The main thread coordinates everything, and handles printing output.
78//!
79//! It is important to be careful which messages use [`push`] vs [`push_bounded`].
80//! `push` is for priority messages (like tokens, or "finished") where the
81//! sender shouldn't block. We want to handle those so real work can proceed
82//! ASAP.
83//!
84//! `push_bounded` is only for messages being printed to stdout/stderr. Being
85//! bounded prevents a flood of messages causing a large amount of memory
86//! being used.
87//!
88//! `push` also avoids blocking which helps avoid deadlocks. For example, when
89//! the diagnostic server thread is dropped, it waits for the thread to exit.
90//! But if the thread is blocked on a full queue, and there is a critical
91//! error, the drop will deadlock. This should be fixed at some point in the
92//! future. The jobserver thread has a similar problem, though it will time
93//! out after 1 second.
94//!
95//! To access the message queue, each running `Job` is given its own [`JobState`],
96//! containing everything it needs to communicate with the main thread.
97//!
98//! See [`Message`] for all available message kinds.
99//!
100//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
101//!     allocation of tokens to rustc beyond the implicit token each rustc owns
102//!     (i.e., the ones used for parallel LLVM work and parallel rustc threads).
103//!     See also ["Rust Compiler Development Guide: Parallel Compilation"]
104//!     and [this comment][rustc-codegen] in rust-lang/rust.
105//!
106//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
107//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
108//! [jobserver]: https://docs.rs/jobserver
109//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
110//! [`push`]: Queue::push
111//! [`push_bounded`]: Queue::push_bounded
112
113mod job;
114mod job_state;
115
116use std::cell::RefCell;
117use std::collections::{HashMap, HashSet};
118use std::fmt::Write as _;
119use std::io;
120use std::path::{Path, PathBuf};
121use std::sync::Arc;
122use std::thread::{self, Scope};
123use std::time::Duration;
124
125use anyhow::{Context as _, format_err};
126use jobserver::{Acquired, HelperThread};
127use semver::Version;
128use tracing::{debug, trace};
129
130pub use self::job::Freshness::{self, Dirty, Fresh};
131pub use self::job::{Job, Work};
132pub use self::job_state::JobState;
133use super::BuildContext;
134use super::BuildRunner;
135use super::CompileMode;
136use super::Unit;
137use super::UnitIndex;
138use super::custom_build::Severity;
139use super::timings::SectionTiming;
140use super::timings::Timings;
141use crate::core::compiler::descriptive_pkg_name;
142use crate::core::compiler::future_incompat::{
143    self, FutureBreakageItem, FutureIncompatReportPackage,
144};
145use crate::core::resolver::ResolveBehavior;
146use crate::core::{PackageId, Shell, TargetKind};
147use crate::util::CargoResult;
148use crate::util::context::WarningHandling;
149use crate::util::diagnostic_server::{self, DiagnosticPrinter};
150use crate::util::errors::AlreadyPrintedError;
151use crate::util::machine_message::{self, Message as _};
152use crate::util::{self, internal};
153use crate::util::{DependencyQueue, GlobalContext, Progress, ProgressStyle, Queue};
154
155/// This structure is backed by the `DependencyQueue` type and manages the
156/// queueing of compilation steps for each package. Packages enqueue units of
157/// work and then later on the entire graph is converted to `DrainState` and
158/// executed.
159pub struct JobQueue<'gctx> {
160    queue: DependencyQueue<Unit, Artifact, Job>,
161    counts: HashMap<PackageId, usize>,
162    timings: Timings<'gctx>,
163}
164
165/// This structure is backed by the `DependencyQueue` type and manages the
166/// actual compilation step of each package. Packages enqueue units of work and
167/// then later on the entire graph is processed and compiled.
168///
169/// It is created from `JobQueue` when we have fully assembled the crate graph
170/// (i.e., all package dependencies are known).
171struct DrainState<'gctx> {
172    // This is the length of the DependencyQueue when starting out
173    total_units: usize,
174
175    queue: DependencyQueue<Unit, Artifact, Job>,
176    messages: Arc<Queue<Message>>,
177    /// Diagnostic deduplication support.
178    diag_dedupe: DiagDedupe<'gctx>,
179    /// Count of warnings, used to print a summary after the job succeeds
180    warning_count: HashMap<JobId, WarningCount>,
181    active: HashMap<JobId, Unit>,
182    compiled: HashSet<PackageId>,
183    documented: HashSet<PackageId>,
184    scraped: HashSet<PackageId>,
185    counts: HashMap<PackageId, usize>,
186    progress: Progress<'gctx>,
187    next_id: u32,
188    timings: Timings<'gctx>,
189
190    /// Map from unit index to unit, for looking up dependency information.
191    index_to_unit: HashMap<UnitIndex, Unit>,
192
193    /// Tokens that are currently owned by this Cargo, and may be "associated"
194    /// with a rustc process. They may also be unused, though if so will be
195    /// dropped on the next loop iteration.
196    ///
197    /// Note that the length of this may be zero, but we will still spawn work,
198    /// as we share the implicit token given to this Cargo process with a
199    /// single rustc process.
200    tokens: Vec<Acquired>,
201
202    /// The list of jobs that we have not yet started executing, but have
203    /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
204    /// allow us to request jobserver tokens pretty early.
205    pending_queue: Vec<(Unit, Job, usize)>,
206    print: DiagnosticPrinter<'gctx>,
207
208    /// How many jobs we've finished
209    finished: usize,
210    per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
211}
212
213/// Count of warnings, used to print a summary after the job succeeds
214#[derive(Default)]
215pub struct WarningCount {
216    /// total number of warnings
217    pub total: usize,
218    /// number of lint warnings
219    pub lints: usize,
220    /// number of warnings that were suppressed because they
221    /// were duplicates of a previous warning
222    pub duplicates: usize,
223    /// number of fixable warnings set to `NotAllowed`
224    /// if any errors have been seen for the current
225    /// target
226    pub fixable: FixableWarnings,
227}
228
229impl WarningCount {
230    /// If an error is seen this should be called
231    /// to set `fixable` to `NotAllowed`
232    fn disallow_fixable(&mut self) {
233        self.fixable = FixableWarnings::NotAllowed;
234    }
235
236    /// Checks fixable if warnings are allowed
237    /// fixable warnings are allowed if no
238    /// errors have been seen for the current
239    /// target. If an error was seen `fixable`
240    /// will be `NotAllowed`.
241    fn fixable_allowed(&self) -> bool {
242        match &self.fixable {
243            FixableWarnings::NotAllowed => false,
244            _ => true,
245        }
246    }
247}
248
249/// Used to keep track of how many fixable warnings there are
250/// and if fixable warnings are allowed
251#[derive(Default)]
252pub enum FixableWarnings {
253    NotAllowed,
254    #[default]
255    Zero,
256    Positive(usize),
257}
258
259pub struct ErrorsDuringDrain {
260    pub count: usize,
261}
262
263struct ErrorToHandle {
264    error: anyhow::Error,
265
266    /// This field is true for "interesting" errors and false for "mundane"
267    /// errors. If false, we print the above error only if it's the first one
268    /// encountered so far while draining the job queue.
269    ///
270    /// At most places that an error is propagated, we set this to false to
271    /// avoid scenarios where Cargo might end up spewing tons of redundant error
272    /// messages. For example if an i/o stream got closed somewhere, we don't
273    /// care about individually reporting every thread that it broke; just the
274    /// first is enough.
275    ///
276    /// The exception where `print_always` is true is that we do report every
277    /// instance of a rustc invocation that failed with diagnostics. This
278    /// corresponds to errors from `Message::Finish`.
279    print_always: bool,
280}
281
282impl<E> From<E> for ErrorToHandle
283where
284    anyhow::Error: From<E>,
285{
286    fn from(error: E) -> Self {
287        ErrorToHandle {
288            error: anyhow::Error::from(error),
289            print_always: false,
290        }
291    }
292}
293
294#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
295pub struct JobId(pub u32);
296
297impl std::fmt::Display for JobId {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        write!(f, "{}", self.0)
300    }
301}
302
303/// Handler for deduplicating diagnostics.
304struct DiagDedupe<'gctx> {
305    seen: RefCell<HashSet<u64>>,
306    gctx: &'gctx GlobalContext,
307}
308
309impl<'gctx> DiagDedupe<'gctx> {
310    fn new(gctx: &'gctx GlobalContext) -> Self {
311        DiagDedupe {
312            seen: RefCell::new(HashSet::new()),
313            gctx,
314        }
315    }
316
317    /// Emits a diagnostic message.
318    ///
319    /// Returns `true` if the message was emitted, or `false` if it was
320    /// suppressed for being a duplicate.
321    fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
322        let h = util::hash_u64(diag);
323        if !self.seen.borrow_mut().insert(h) {
324            return Ok(false);
325        }
326        let mut shell = self.gctx.shell();
327        shell.print_ansi_stderr(diag.as_bytes())?;
328        shell.err().write_all(b"\n")?;
329        Ok(true)
330    }
331}
332
333/// Possible artifacts that can be produced by compilations, used as edge values
334/// in the dependency graph.
335///
336/// As edge values we can have multiple kinds of edges depending on one node,
337/// for example some units may only depend on the metadata for an rlib while
338/// others depend on the full rlib. This `Artifact` enum is used to distinguish
339/// this case and track the progress of compilations as they proceed.
340#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
341enum Artifact {
342    /// A generic placeholder for "depends on everything run by a step" and
343    /// means that we can't start the next compilation until the previous has
344    /// finished entirely.
345    All,
346
347    /// A node indicating that we only depend on the metadata of a compilation,
348    /// but the compilation is typically also producing an rlib. We can start
349    /// our step, however, before the full rlib is available.
350    Metadata,
351}
352
353enum Message {
354    Run(JobId, String),
355    Stdout(String),
356    Stderr(String),
357
358    // This is for general stderr output from subprocesses
359    Diagnostic {
360        id: JobId,
361        level: String,
362        diag: String,
363        lint: bool,
364        fixable: bool,
365    },
366    // This handles duplicate output that is suppressed, for showing
367    // only a count of duplicate messages instead
368    WarningCount {
369        id: JobId,
370        lint: bool,
371        emitted: bool,
372        fixable: bool,
373    },
374    // This is for warnings generated by Cargo's interpretation of the
375    // subprocess output, e.g. scrape-examples prints a warning if a
376    // unit fails to be scraped
377    Warning {
378        id: JobId,
379        warning: String,
380    },
381
382    FixDiagnostic(diagnostic_server::Message),
383    Token(io::Result<Acquired>),
384    Finish(JobId, Artifact, CargoResult<()>),
385    FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
386    SectionTiming(JobId, SectionTiming),
387}
388
389impl<'gctx> JobQueue<'gctx> {
390    pub fn new(bcx: &BuildContext<'_, 'gctx>) -> JobQueue<'gctx> {
391        JobQueue {
392            queue: DependencyQueue::new(),
393            counts: HashMap::new(),
394            timings: Timings::new(bcx, &bcx.roots),
395        }
396    }
397
398    pub fn enqueue(
399        &mut self,
400        build_runner: &BuildRunner<'_, 'gctx>,
401        unit: &Unit,
402        job: Job,
403    ) -> CargoResult<()> {
404        let dependencies = build_runner.unit_deps(unit);
405        let mut queue_deps = dependencies
406            .iter()
407            .filter(|dep| {
408                // Binaries aren't actually needed to *compile* tests, just to run
409                // them, so we don't include this dependency edge in the job graph.
410                // But we shouldn't filter out dependencies being scraped for Rustdoc.
411                (!dep.unit.target.is_test() && !dep.unit.target.is_bin())
412                    || dep.unit.artifact.is_true()
413                    || dep.unit.mode.is_doc_scrape()
414            })
415            .map(|dep| {
416                // Handle the case here where our `unit -> dep` dependency may
417                // only require the metadata, not the full compilation to
418                // finish. Use the tables in `build_runner` to figure out what
419                // kind of artifact is associated with this dependency.
420                let artifact = if build_runner.only_requires_rmeta(unit, &dep.unit) {
421                    Artifact::Metadata
422                } else {
423                    Artifact::All
424                };
425                (dep.unit.clone(), artifact)
426            })
427            .collect::<HashMap<_, _>>();
428
429        // This is somewhat tricky, but we may need to synthesize some
430        // dependencies for this target if it requires full upstream
431        // compilations to have completed. Because of pipelining, some
432        // dependency edges may be `Metadata` due to the above clause (as
433        // opposed to everything being `All`). For example consider:
434        //
435        //    a (binary)
436        //    └ b (lib)
437        //        └ c (lib)
438        //
439        // Here the dependency edge from B to C will be `Metadata`, and the
440        // dependency edge from A to B will be `All`. For A to be compiled,
441        // however, it currently actually needs the full rlib of C. This means
442        // that we need to synthesize a dependency edge for the dependency graph
443        // from A to C. That's done here.
444        //
445        // This will walk all dependencies of the current target, and if any of
446        // *their* dependencies are `Metadata` then we depend on the `All` of
447        // the target as well. This should ensure that edges changed to
448        // `Metadata` propagate upwards `All` dependencies to anything that
449        // transitively contains the `Metadata` edge.
450        if unit.requires_upstream_objects() {
451            for dep in dependencies {
452                depend_on_deps_of_deps(build_runner, &mut queue_deps, dep.unit.clone());
453            }
454
455            fn depend_on_deps_of_deps(
456                build_runner: &BuildRunner<'_, '_>,
457                deps: &mut HashMap<Unit, Artifact>,
458                unit: Unit,
459            ) {
460                for dep in build_runner.unit_deps(&unit) {
461                    if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
462                        depend_on_deps_of_deps(build_runner, deps, dep.unit.clone());
463                    }
464                }
465            }
466        }
467
468        // For now we use a fixed placeholder value for the cost of each unit, but
469        // in the future this could be used to allow users to provide hints about
470        // relative expected costs of units, or this could be automatically set in
471        // a smarter way using timing data from a previous compilation.
472        self.queue.queue(unit.clone(), job, queue_deps, 100);
473        *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
474        Ok(())
475    }
476
477    /// Executes all jobs necessary to build the dependency graph.
478    ///
479    /// This function will spawn off `config.jobs()` workers to build all of the
480    /// necessary dependencies, in order. Freshness is propagated as far as
481    /// possible along each dependency chain.
482    #[tracing::instrument(skip_all)]
483    pub fn execute(mut self, build_runner: &mut BuildRunner<'_, '_>) -> CargoResult<()> {
484        self.queue.queue_finished();
485
486        let progress =
487            Progress::with_style("Building", ProgressStyle::Ratio, build_runner.bcx.gctx);
488        let state = DrainState {
489            total_units: self.queue.len(),
490            queue: self.queue,
491            // 100 here is somewhat arbitrary. It is a few screenfulls of
492            // output, and hopefully at most a few megabytes of memory for
493            // typical messages. If you change this, please update the test
494            // caching_large_output, too.
495            messages: Arc::new(Queue::new(100)),
496            diag_dedupe: DiagDedupe::new(build_runner.bcx.gctx),
497            warning_count: HashMap::new(),
498            active: HashMap::new(),
499            compiled: HashSet::new(),
500            documented: HashSet::new(),
501            scraped: HashSet::new(),
502            counts: self.counts,
503            progress,
504            next_id: 0,
505            timings: self.timings,
506            index_to_unit: build_runner
507                .bcx
508                .unit_to_index
509                .iter()
510                .map(|(unit, &index)| (index, unit.clone()))
511                .collect(),
512            tokens: Vec::new(),
513            pending_queue: Vec::new(),
514            print: DiagnosticPrinter::new(
515                build_runner.bcx.gctx,
516                &build_runner.bcx.rustc().workspace_wrapper,
517            ),
518            finished: 0,
519            per_package_future_incompat_reports: Vec::new(),
520        };
521
522        // Create a helper thread for acquiring jobserver tokens
523        let messages = state.messages.clone();
524        let helper = build_runner
525            .jobserver
526            .clone()
527            .into_helper_thread(move |token| {
528                messages.push(Message::Token(token));
529            })
530            .context("failed to create helper thread for jobserver management")?;
531
532        // Create a helper thread to manage the diagnostics for rustfix if
533        // necessary.
534        let messages = state.messages.clone();
535        // It is important that this uses `push` instead of `push_bounded` for
536        // now. If someone wants to fix this to be bounded, the `drop`
537        // implementation needs to be changed to avoid possible deadlocks.
538        let _diagnostic_server = build_runner
539            .bcx
540            .build_config
541            .rustfix_diagnostic_server
542            .borrow_mut()
543            .take()
544            .map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
545
546        thread::scope(
547            move |scope| match state.drain_the_queue(build_runner, scope, &helper) {
548                Some(err) => Err(err),
549                None => Ok(()),
550            },
551        )
552    }
553}
554
555impl<'gctx> DrainState<'gctx> {
556    fn spawn_work_if_possible<'s>(
557        &mut self,
558        build_runner: &mut BuildRunner<'_, '_>,
559        jobserver_helper: &HelperThread,
560        scope: &'s Scope<'s, '_>,
561    ) -> CargoResult<()> {
562        // Dequeue as much work as we can, learning about everything
563        // possible that can run. Note that this is also the point where we
564        // start requesting job tokens. Each job after the first needs to
565        // request a token.
566        while let Some((unit, job, priority)) = self.queue.dequeue() {
567            // We want to keep the pieces of work in the `pending_queue` sorted
568            // by their priorities, and insert the current job at its correctly
569            // sorted position: following the lower priority jobs, and the ones
570            // with the same priority (since they were dequeued before the
571            // current one, we also keep that relation).
572            let idx = self
573                .pending_queue
574                .partition_point(|&(_, _, p)| p <= priority);
575            self.pending_queue.insert(idx, (unit, job, priority));
576            if self.active.len() + self.pending_queue.len() > 1 {
577                jobserver_helper.request_token();
578            }
579        }
580
581        // Now that we've learned of all possible work that we can execute
582        // try to spawn it so long as we've got a jobserver token which says
583        // we're able to perform some parallel work.
584        // The `pending_queue` is sorted in ascending priority order, and we
585        // remove items from its end to schedule the highest priority items
586        // sooner.
587        while self.has_extra_tokens() && !self.pending_queue.is_empty() {
588            let (unit, job, _) = self.pending_queue.pop().unwrap();
589            *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
590            // Print out some nice progress information.
591            // NOTE: An error here will drop the job without starting it.
592            // That should be OK, since we want to exit as soon as
593            // possible during an error.
594            self.note_working_on(
595                build_runner.bcx.gctx,
596                build_runner.bcx.ws.root(),
597                &unit,
598                job.freshness(),
599            )?;
600            self.run(&unit, job, build_runner, scope);
601        }
602
603        Ok(())
604    }
605
606    fn has_extra_tokens(&self) -> bool {
607        self.active.len() < self.tokens.len() + 1
608    }
609
610    fn handle_event(
611        &mut self,
612        build_runner: &mut BuildRunner<'_, '_>,
613        event: Message,
614    ) -> Result<(), ErrorToHandle> {
615        let warning_handling = build_runner.bcx.gctx.warning_handling()?;
616        match event {
617            Message::Run(id, cmd) => {
618                build_runner
619                    .bcx
620                    .gctx
621                    .shell()
622                    .verbose(|c| c.status("Running", &cmd))?;
623                self.timings
624                    .unit_start(build_runner, id, self.active[&id].clone());
625            }
626            Message::Stdout(out) => {
627                writeln!(build_runner.bcx.gctx.shell().out(), "{}", out)?;
628            }
629            Message::Stderr(err) => {
630                let mut shell = build_runner.bcx.gctx.shell();
631                shell.print_ansi_stderr(err.as_bytes())?;
632                shell.err().write_all(b"\n")?;
633            }
634            Message::Diagnostic {
635                id,
636                level,
637                diag,
638                lint,
639                fixable,
640            } => {
641                let emitted = self.diag_dedupe.emit_diag(&diag)?;
642                if level == "warning" {
643                    self.bump_warning_count(id, lint, emitted, fixable);
644                }
645                if level == "error" {
646                    let cnts = self.warning_count.entry(id).or_default();
647                    // If there is an error, the `cargo fix` message should not show
648                    cnts.disallow_fixable();
649                }
650            }
651            Message::Warning { id, warning } => {
652                if warning_handling != WarningHandling::Allow {
653                    build_runner.bcx.gctx.shell().warn(warning)?;
654                }
655                let lint = false;
656                let emitted = true;
657                let fixable = false;
658                self.bump_warning_count(id, lint, emitted, fixable);
659            }
660            Message::WarningCount {
661                id,
662                lint,
663                emitted,
664                fixable,
665            } => {
666                self.bump_warning_count(id, lint, emitted, fixable);
667            }
668            Message::FixDiagnostic(msg) => {
669                self.print.print(&msg)?;
670            }
671            Message::Finish(id, artifact, result) => {
672                let unit = match artifact {
673                    // If `id` has completely finished we remove it
674                    // from the `active` map ...
675                    Artifact::All => {
676                        trace!("end: {:?}", id);
677                        self.finished += 1;
678                        self.report_warning_count(
679                            build_runner,
680                            id,
681                            &build_runner.bcx.rustc().workspace_wrapper,
682                        );
683                        self.active.remove(&id).unwrap()
684                    }
685                    // ... otherwise if it hasn't finished we leave it
686                    // in there as we'll get another `Finish` later on.
687                    Artifact::Metadata => {
688                        trace!("end (meta): {:?}", id);
689                        self.active[&id].clone()
690                    }
691                };
692                debug!("end ({:?}): {:?}", unit, result);
693                match result {
694                    Ok(()) => self.finish(id, &unit, artifact, build_runner)?,
695                    Err(_) if build_runner.bcx.unit_can_fail_for_docscraping(&unit) => {
696                        build_runner
697                            .failed_scrape_units
698                            .lock()
699                            .unwrap()
700                            .insert(build_runner.files().metadata(&unit).unit_id());
701                        self.queue.finish(&unit, &artifact);
702                    }
703                    Err(error) => {
704                        let show_warnings = true;
705                        self.emit_log_messages(&unit, build_runner, show_warnings)?;
706                        self.back_compat_notice(build_runner, &unit)?;
707                        return Err(ErrorToHandle {
708                            error,
709                            print_always: true,
710                        });
711                    }
712                }
713            }
714            Message::FutureIncompatReport(id, items) => {
715                let unit = &self.active[&id];
716                let package_id = unit.pkg.package_id();
717                let is_local = unit.is_local();
718                self.per_package_future_incompat_reports
719                    .push(FutureIncompatReportPackage {
720                        package_id,
721                        is_local,
722                        items,
723                    });
724            }
725            Message::Token(acquired_token) => {
726                let token = acquired_token.context("failed to acquire jobserver token")?;
727                self.tokens.push(token);
728            }
729            Message::SectionTiming(id, section) => {
730                self.timings.unit_section_timing(build_runner, id, &section);
731            }
732        }
733
734        Ok(())
735    }
736
737    // This will also tick the progress bar as appropriate
738    fn wait_for_events(&mut self) -> Vec<Message> {
739        // Drain all events at once to avoid displaying the progress bar
740        // unnecessarily. If there's no events we actually block waiting for
741        // an event, but we keep a "heartbeat" going to allow `record_cpu`
742        // to run above to calculate CPU usage over time. To do this we
743        // listen for a message with a timeout, and on timeout we run the
744        // previous parts of the loop again.
745        let mut events = self.messages.try_pop_all();
746        if events.is_empty() {
747            loop {
748                self.tick_progress();
749                self.tokens.truncate(self.active.len() - 1);
750                match self.messages.pop(Duration::from_millis(500)) {
751                    Some(message) => {
752                        events.push(message);
753                        break;
754                    }
755                    None => continue,
756                }
757            }
758        }
759        events
760    }
761
762    /// This is the "main" loop, where Cargo does all work to run the
763    /// compiler.
764    ///
765    /// This returns an Option to prevent the use of `?` on `Result` types
766    /// because it is important for the loop to carefully handle errors.
767    fn drain_the_queue<'s>(
768        mut self,
769        build_runner: &mut BuildRunner<'_, '_>,
770        scope: &'s Scope<'s, '_>,
771        jobserver_helper: &HelperThread,
772    ) -> Option<anyhow::Error> {
773        trace!("queue: {:#?}", self.queue);
774
775        // Iteratively execute the entire dependency graph. Each turn of the
776        // loop starts out by scheduling as much work as possible (up to the
777        // maximum number of parallel jobs we have tokens for). A local queue
778        // is maintained separately from the main dependency queue as one
779        // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
780        // in one package).
781        //
782        // After a job has finished we update our internal state if it was
783        // successful and otherwise wait for pending work to finish if it failed
784        // and then immediately return (or keep going, if requested by the build
785        // config).
786        let mut errors = ErrorsDuringDrain { count: 0 };
787        // CAUTION! Do not use `?` or break out of the loop early. Every error
788        // must be handled in such a way that the loop is still allowed to
789        // drain event messages.
790        loop {
791            if errors.count == 0 || build_runner.bcx.build_config.keep_going {
792                if let Err(e) = self.spawn_work_if_possible(build_runner, jobserver_helper, scope) {
793                    self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
794                }
795            }
796
797            // If after all that we're not actually running anything then we're
798            // done!
799            if self.active.is_empty() {
800                break;
801            }
802
803            // And finally, before we block waiting for the next event, drop any
804            // excess tokens we may have accidentally acquired. Due to how our
805            // jobserver interface is architected we may acquire a token that we
806            // don't actually use, and if this happens just relinquish it back
807            // to the jobserver itself.
808            for event in self.wait_for_events() {
809                if let Err(event_err) = self.handle_event(build_runner, event) {
810                    self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, event_err);
811                }
812            }
813        }
814        self.progress.clear();
815
816        let profile_name = build_runner.bcx.build_config.requested_profile;
817        // NOTE: this may be a bit inaccurate, since this may not display the
818        // profile for what was actually built. Profile overrides can change
819        // these settings, and in some cases different targets are built with
820        // different profiles. To be accurate, it would need to collect a
821        // list of Units built, and maybe display a list of the different
822        // profiles used. However, to keep it simple and compatible with old
823        // behavior, we just display what the base profile is.
824        let profile = build_runner.bcx.profiles.base_profile();
825        let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
826            "unoptimized"
827        } else {
828            "optimized"
829        });
830        if profile.debuginfo.is_turned_on() {
831            opt_type += " + debuginfo";
832        }
833
834        let time_elapsed = util::elapsed(build_runner.bcx.gctx.creation_time().elapsed());
835        if let Err(e) = self
836            .timings
837            .finished(build_runner, &errors.to_error())
838            .context("failed to render timing report")
839        {
840            self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
841        }
842        if build_runner.bcx.build_config.emit_json() {
843            let mut shell = build_runner.bcx.gctx.shell();
844            let msg = machine_message::BuildFinished {
845                success: errors.count == 0,
846            }
847            .to_json_string();
848            if let Err(e) = writeln!(shell.out(), "{}", msg) {
849                self.handle_error(&mut shell, &mut errors, e);
850            }
851        }
852
853        if let Some(error) = errors.to_error() {
854            // Any errors up to this point have already been printed via the
855            // `display_error` inside `handle_error`.
856            Some(anyhow::Error::new(AlreadyPrintedError::new(error)))
857        } else if self.queue.is_empty() && self.pending_queue.is_empty() {
858            let profile_link = build_runner.bcx.gctx.shell().err_hyperlink(
859                "https://doc.rust-lang.org/cargo/reference/profiles.html#default-profiles",
860            );
861            let message = format!(
862                "{profile_link}`{profile_name}` profile [{opt_type}]{profile_link:#} target(s) in {time_elapsed}",
863            );
864            // It doesn't really matter if this fails.
865            let _ = build_runner.bcx.gctx.shell().status("Finished", message);
866            future_incompat::save_and_display_report(
867                build_runner.bcx,
868                &self.per_package_future_incompat_reports,
869            );
870
871            None
872        } else {
873            debug!("queue: {:#?}", self.queue);
874            Some(internal("finished with jobs still left in the queue"))
875        }
876    }
877
878    fn handle_error(
879        &mut self,
880        shell: &mut Shell,
881        err_state: &mut ErrorsDuringDrain,
882        new_err: impl Into<ErrorToHandle>,
883    ) {
884        let new_err = new_err.into();
885        if new_err.print_always || err_state.count == 0 {
886            crate::display_error(&new_err.error, shell);
887            if err_state.count == 0 && !self.active.is_empty() {
888                self.progress.indicate_error();
889                let _ = shell.warn("build failed, waiting for other jobs to finish...");
890            }
891            err_state.count += 1;
892        } else {
893            tracing::warn!("{:?}", new_err.error);
894        }
895    }
896
897    // This also records CPU usage and marks concurrency; we roughly want to do
898    // this as often as we spin on the events receiver (at least every 500ms or
899    // so).
900    fn tick_progress(&mut self) {
901        // Record some timing information if `--timings` is enabled, and
902        // this'll end up being a noop if we're not recording this
903        // information.
904        self.timings.record_cpu();
905
906        let active_names = self
907            .active
908            .values()
909            .map(|u| self.name_for_progress(u))
910            .collect::<Vec<_>>();
911        let _ = self.progress.tick_now(
912            self.finished,
913            self.total_units,
914            &format!(": {}", active_names.join(", ")),
915        );
916    }
917
918    fn name_for_progress(&self, unit: &Unit) -> String {
919        let pkg_name = unit.pkg.name();
920        let target_name = unit.target.name();
921        match unit.mode {
922            CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
923            CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
924            CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
925                TargetKind::Lib(_) => format!("{}(test)", target_name),
926                TargetKind::CustomBuild => panic!("cannot test build script"),
927                TargetKind::Bin => format!("{}(bin test)", target_name),
928                TargetKind::Test => format!("{}(test)", target_name),
929                TargetKind::Bench => format!("{}(bench)", target_name),
930                TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
931                    format!("{}(example test)", target_name)
932                }
933            },
934            _ => match unit.target.kind() {
935                TargetKind::Lib(_) => pkg_name.to_string(),
936                TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
937                TargetKind::Bin => format!("{}(bin)", target_name),
938                TargetKind::Test => format!("{}(test)", target_name),
939                TargetKind::Bench => format!("{}(bench)", target_name),
940                TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
941                    format!("{}(example)", target_name)
942                }
943            },
944        }
945    }
946
947    /// Executes a job.
948    ///
949    /// Fresh jobs block until finished (which should be very fast!), Dirty
950    /// jobs will spawn a thread in the background and return immediately.
951    fn run<'s>(
952        &mut self,
953        unit: &Unit,
954        job: Job,
955        build_runner: &BuildRunner<'_, '_>,
956        scope: &'s Scope<'s, '_>,
957    ) {
958        let id = JobId(self.next_id);
959        self.next_id = self.next_id.checked_add(1).unwrap();
960
961        debug!("start {}: {:?}", id, unit);
962
963        assert!(self.active.insert(id, unit.clone()).is_none());
964
965        let messages = self.messages.clone();
966        let is_fresh = job.freshness().is_fresh();
967        let rmeta_required = build_runner.rmeta_required(unit);
968        let lock_manager = build_runner.lock_manager.clone();
969
970        let doit = move |diag_dedupe| {
971            let state = JobState::new(id, messages, diag_dedupe, rmeta_required, lock_manager);
972            state.run_to_finish(job);
973        };
974
975        match is_fresh {
976            true => {
977                self.timings.add_fresh();
978                // Running a fresh job on the same thread is often much faster than spawning a new
979                // thread to run the job.
980                doit(Some(&self.diag_dedupe));
981            }
982            false => {
983                self.timings.add_dirty();
984                scope.spawn(move || doit(None));
985            }
986        }
987    }
988
989    fn emit_log_messages(
990        &self,
991        unit: &Unit,
992        build_runner: &mut BuildRunner<'_, '_>,
993        show_warnings: bool,
994    ) -> CargoResult<()> {
995        let outputs = build_runner.build_script_outputs.lock().unwrap();
996        let Some(metadata_vec) = build_runner.find_build_script_metadatas(unit) else {
997            return Ok(());
998        };
999        let bcx = &mut build_runner.bcx;
1000        for metadata in metadata_vec {
1001            if let Some(output) = outputs.get(metadata) {
1002                if !output.log_messages.is_empty()
1003                    && (show_warnings
1004                        || output
1005                            .log_messages
1006                            .iter()
1007                            .any(|(severity, _)| *severity == Severity::Error))
1008                {
1009                    let msg_with_package =
1010                        |msg: &str| format!("{}@{}: {}", unit.pkg.name(), unit.pkg.version(), msg);
1011
1012                    for (severity, message) in output.log_messages.iter() {
1013                        match severity {
1014                            Severity::Error => {
1015                                bcx.gctx.shell().error(msg_with_package(message))?;
1016                            }
1017                            Severity::Warning => {
1018                                bcx.gctx.shell().warn(msg_with_package(message))?;
1019                            }
1020                        }
1021                    }
1022                }
1023            }
1024        }
1025
1026        Ok(())
1027    }
1028
1029    fn bump_warning_count(&mut self, id: JobId, lint: bool, emitted: bool, fixable: bool) {
1030        let cnts = self.warning_count.entry(id).or_default();
1031        cnts.total += 1;
1032        if lint {
1033            cnts.lints += 1;
1034        }
1035        if !emitted {
1036            cnts.duplicates += 1;
1037        // Don't add to fixable if it's already been emitted
1038        } else if fixable {
1039            // Do not add anything to the fixable warning count if
1040            // is `NotAllowed` since that indicates there was an
1041            // error while building this `Unit`
1042            if cnts.fixable_allowed() {
1043                cnts.fixable = match cnts.fixable {
1044                    FixableWarnings::NotAllowed => FixableWarnings::NotAllowed,
1045                    FixableWarnings::Zero => FixableWarnings::Positive(1),
1046                    FixableWarnings::Positive(fixable) => FixableWarnings::Positive(fixable + 1),
1047                };
1048            }
1049        }
1050    }
1051
1052    /// Displays a final report of the warnings emitted by a particular job.
1053    fn report_warning_count(
1054        &mut self,
1055        runner: &mut BuildRunner<'_, '_>,
1056        id: JobId,
1057        rustc_workspace_wrapper: &Option<PathBuf>,
1058    ) {
1059        let gctx = runner.bcx.gctx;
1060        let count = match self.warning_count.get(&id) {
1061            // An error could add an entry for a `Unit`
1062            // with 0 warnings but having fixable
1063            // warnings be disallowed
1064            Some(count) if count.total > 0 => count,
1065            None | Some(_) => return,
1066        };
1067        runner.compilation.lint_warning_count += count.lints;
1068        let unit = &self.active[&id];
1069        let mut message = descriptive_pkg_name(&unit.pkg.name(), &unit.target, &unit.mode);
1070        message.push_str(" generated ");
1071        match count.total {
1072            1 => message.push_str("1 warning"),
1073            n => {
1074                let _ = write!(message, "{} warnings", n);
1075            }
1076        };
1077        match count.duplicates {
1078            0 => {}
1079            1 => message.push_str(" (1 duplicate)"),
1080            n => {
1081                let _ = write!(message, " ({} duplicates)", n);
1082            }
1083        }
1084        // Only show the `cargo fix` message if its a local `Unit`
1085        if unit.is_local() {
1086            // Do not show this if there are any errors or no fixable warnings
1087            if let FixableWarnings::Positive(fixable) = count.fixable {
1088                // `cargo fix` doesn't have an option for custom builds
1089                if !unit.target.is_custom_build() {
1090                    // To make sure the correct command is shown for `clippy` we
1091                    // check if `RUSTC_WORKSPACE_WRAPPER` is set and pointing towards
1092                    // `clippy-driver`.
1093                    let clippy = std::ffi::OsStr::new("clippy-driver");
1094                    let command = match rustc_workspace_wrapper.as_ref().and_then(|x| x.file_stem())
1095                    {
1096                        Some(wrapper) if wrapper == clippy => "cargo clippy --fix",
1097                        _ => "cargo fix",
1098                    };
1099                    let mut args =
1100                        format!("{} -p {}", unit.target.description_named(), unit.pkg.name());
1101                    if unit.mode.is_rustc_test()
1102                        && !(unit.target.is_test() || unit.target.is_bench())
1103                    {
1104                        args.push_str(" --tests");
1105                    }
1106                    let mut suggestions = format!("{} suggestion", fixable);
1107                    if fixable > 1 {
1108                        suggestions.push_str("s")
1109                    }
1110                    let _ = write!(
1111                        message,
1112                        " (run `{command} --{args}` to apply {suggestions})"
1113                    );
1114                }
1115            }
1116        }
1117        // Errors are ignored here because it is tricky to handle them
1118        // correctly, and they aren't important.
1119        let _ = gctx.shell().warn(message);
1120    }
1121
1122    fn finish(
1123        &mut self,
1124        id: JobId,
1125        unit: &Unit,
1126        artifact: Artifact,
1127        build_runner: &mut BuildRunner<'_, '_>,
1128    ) -> CargoResult<()> {
1129        if unit.mode.is_run_custom_build() {
1130            self.emit_log_messages(
1131                unit,
1132                build_runner,
1133                unit.show_warnings(build_runner.bcx.gctx),
1134            )?;
1135        }
1136        let unblocked = self.queue.finish(unit, &artifact);
1137        match artifact {
1138            Artifact::All => self.timings.unit_finished(build_runner, id, unblocked),
1139            Artifact::Metadata => self
1140                .timings
1141                .unit_rmeta_finished(build_runner, id, unblocked),
1142        }
1143        Ok(())
1144    }
1145
1146    // This isn't super trivial because we don't want to print loads and
1147    // loads of information to the console, but we also want to produce a
1148    // faithful representation of what's happening. This is somewhat nuanced
1149    // as a package can start compiling *very* early on because of custom
1150    // build commands and such.
1151    //
1152    // In general, we try to print "Compiling" for the first nontrivial task
1153    // run for a package, regardless of when that is. We then don't print
1154    // out any more information for a package after we've printed it once.
1155    fn note_working_on(
1156        &mut self,
1157        gctx: &GlobalContext,
1158        ws_root: &Path,
1159        unit: &Unit,
1160        fresh: &Freshness,
1161    ) -> CargoResult<()> {
1162        if (self.compiled.contains(&unit.pkg.package_id())
1163            && !unit.mode.is_doc()
1164            && !unit.mode.is_doc_scrape())
1165            || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
1166            || (self.scraped.contains(&unit.pkg.package_id()) && unit.mode.is_doc_scrape())
1167        {
1168            return Ok(());
1169        }
1170
1171        match fresh {
1172            // Any dirty stage which runs at least one command gets printed as
1173            // being a compiled package.
1174            Dirty(dirty_reason) => {
1175                if !dirty_reason.is_fresh_build() {
1176                    gctx.shell().verbose(|shell| {
1177                        dirty_reason.present_to(shell, unit, ws_root, &self.index_to_unit)
1178                    })?;
1179                }
1180
1181                if unit.mode.is_doc() {
1182                    self.documented.insert(unit.pkg.package_id());
1183                    gctx.shell().status("Documenting", &unit.pkg)?;
1184                } else if unit.mode.is_doc_test() {
1185                    // Skip doc test.
1186                } else if unit.mode.is_doc_scrape() {
1187                    self.scraped.insert(unit.pkg.package_id());
1188                    gctx.shell().status("Scraping", &unit.pkg)?;
1189                } else {
1190                    self.compiled.insert(unit.pkg.package_id());
1191                    if unit.mode.is_check() {
1192                        gctx.shell().status("Checking", &unit.pkg)?;
1193                    } else {
1194                        gctx.shell().status("Compiling", &unit.pkg)?;
1195                    }
1196                }
1197            }
1198            Fresh => {
1199                // If doc test are last, only print "Fresh" if nothing has been printed.
1200                if self.counts[&unit.pkg.package_id()] == 0
1201                    && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
1202                {
1203                    self.compiled.insert(unit.pkg.package_id());
1204                    gctx.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
1205                }
1206            }
1207        }
1208        Ok(())
1209    }
1210
1211    fn back_compat_notice(
1212        &self,
1213        build_runner: &BuildRunner<'_, '_>,
1214        unit: &Unit,
1215    ) -> CargoResult<()> {
1216        if unit.pkg.name() != "diesel"
1217            || unit.pkg.version() >= &Version::new(1, 4, 8)
1218            || build_runner.bcx.ws.resolve_behavior() == ResolveBehavior::V1
1219            || !unit.pkg.package_id().source_id().is_registry()
1220            || !unit.features.is_empty()
1221        {
1222            return Ok(());
1223        }
1224        if !build_runner
1225            .bcx
1226            .unit_graph
1227            .keys()
1228            .any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
1229        {
1230            return Ok(());
1231        }
1232        build_runner.bcx.gctx.shell().note(
1233            "\
1234This error may be due to an interaction between diesel and Cargo's new
1235feature resolver. Try updating to diesel 1.4.8 to fix this error.
1236",
1237        )?;
1238        Ok(())
1239    }
1240}
1241
1242impl ErrorsDuringDrain {
1243    fn to_error(&self) -> Option<anyhow::Error> {
1244        match self.count {
1245            0 => None,
1246            1 => Some(format_err!("1 job failed")),
1247            n => Some(format_err!("{} jobs failed", n)),
1248        }
1249    }
1250}