cargo/core/compiler/job_queue/mod.rs
1//! Management of the interaction between the main `cargo` and all spawned jobs.
2//!
3//! ## Overview
4//!
5//! This module implements a job queue. A job here represents a unit of work,
6//! which is roughly a rustc invocation, a build script run, or just a no-op.
7//! The job queue primarily handles the following things:
8//!
9//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be
10//! either executed on a spawned thread or ran on the same thread to avoid
11//! the threading overhead.
12//! * Controls the number of concurrency. It allocates and manages [`jobserver`]
13//! tokens to each spawned off rustc and build scripts.
14//! * Manages the communication between the main `cargo` process and its
15//! spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared
16//! across threads.
17//! * Schedules the execution order of each [`Job`]. Priorities are determined
18//! when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is
19//! relatively rudimentary and could likely be improved.
20//!
21//! A rough outline of building a queue and executing jobs is:
22//!
23//! 1. [`JobQueue::new`] to simply create one queue.
24//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue.
25//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`].
26//!
27//! The primary loop happens insides [`JobQueue::execute`], which is effectively
28//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells,
29//! the running state of the job queue getting drained.
30//!
31//! ## Jobserver
32//!
33//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
34//! relationship with each other. They share a single jobserver amongst what
35//! is potentially hundreds of threads of work on many-cored systems.
36//! The jobserver could come from either the environment (e.g., from a `make`
37//! invocation), or from Cargo creating its own jobserver server if there is no
38//! jobserver to inherit from.
39//!
40//! Cargo wants to complete the build as quickly as possible, fully saturating
41//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
42//! more than N threads of work: the total amount of tokens we have floating
43//! around must always be limited to N.
44//!
45//! It is not really possible to optimally choose which crate should build
46//! first or last; nor is it possible to decide whether to give an additional
47//! token to rustc first or rather spawn a new crate of work. The algorithm in
48//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
49//! possible. In short, the jobserver relationship among Cargo and rustc
50//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
51//! processes in terms of parallelism[^parallel-rustc].
52//!
53//! We integrate with the [jobserver] crate, originating from GNU make
54//! [POSIX jobserver], to make sure that build scripts which use make to
55//! build C code can cooperate with us on the number of used tokens and
56//! avoid overfilling the system we're on.
57//!
58//! ## Scheduling
59//!
60//! The current scheduling algorithm is not really polished. It is simply based
61//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto
62//! the graph until we finalize it. When the graph gets finalized, it finds the
63//! sum of the cost of each dependencies of each node, including transitively.
64//! The sum of dependency cost turns out to be the cost of each given node.
65//!
66//! At the time being, the cost is just passed as a fixed placeholder in
67//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities
68//! around it. For instance, we start persisting timing information for each
69//! build somewhere. For a subsequent build, we can look into the historical
70//! data and perform a PGO-like optimization to prioritize jobs, making a build
71//! fully pipelined.
72//!
73//! ## Message queue
74//!
75//! Each spawned thread running a process uses the message queue [`Queue`] to
76//! send messages back to the main thread (the one running `cargo`).
77//! The main thread coordinates everything, and handles printing output.
78//!
79//! It is important to be careful which messages use [`push`] vs [`push_bounded`].
80//! `push` is for priority messages (like tokens, or "finished") where the
81//! sender shouldn't block. We want to handle those so real work can proceed
82//! ASAP.
83//!
84//! `push_bounded` is only for messages being printed to stdout/stderr. Being
85//! bounded prevents a flood of messages causing a large amount of memory
86//! being used.
87//!
88//! `push` also avoids blocking which helps avoid deadlocks. For example, when
89//! the diagnostic server thread is dropped, it waits for the thread to exit.
90//! But if the thread is blocked on a full queue, and there is a critical
91//! error, the drop will deadlock. This should be fixed at some point in the
92//! future. The jobserver thread has a similar problem, though it will time
93//! out after 1 second.
94//!
95//! To access the message queue, each running `Job` is given its own [`JobState`],
96//! containing everything it needs to communicate with the main thread.
97//!
98//! See [`Message`] for all available message kinds.
99//!
100//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
101//! allocation of tokens to rustc beyond the implicit token each rustc owns
102//! (i.e., the ones used for parallel LLVM work and parallel rustc threads).
103//! See also ["Rust Compiler Development Guide: Parallel Compilation"]
104//! and [this comment][rustc-codegen] in rust-lang/rust.
105//!
106//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
107//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
108//! [jobserver]: https://docs.rs/jobserver
109//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
110//! [`push`]: Queue::push
111//! [`push_bounded`]: Queue::push_bounded
112
113mod job;
114mod job_state;
115
116use std::cell::RefCell;
117use std::collections::{HashMap, HashSet};
118use std::fmt::Write as _;
119use std::path::{Path, PathBuf};
120use std::sync::Arc;
121use std::thread::{self, Scope};
122use std::time::Duration;
123use std::{env, io};
124
125use anyhow::{Context as _, format_err};
126use jobserver::{Acquired, HelperThread};
127use semver::Version;
128use tracing::{debug, trace};
129
130pub use self::job::Freshness::{self, Dirty, Fresh};
131pub use self::job::{Job, Work};
132pub use self::job_state::JobState;
133use super::BuildContext;
134use super::BuildRunner;
135use super::CompileMode;
136use super::Unit;
137use super::UnitIndex;
138use super::custom_build::Severity;
139use super::timings::SectionTiming;
140use super::timings::Timings;
141use crate::core::compiler::descriptive_pkg_name;
142use crate::core::compiler::future_incompat::{
143 self, FutureBreakageItem, FutureIncompatReportPackage,
144};
145use crate::core::resolver::ResolveBehavior;
146use crate::core::{PackageId, Shell, TargetKind};
147use crate::util::CargoResult;
148use crate::util::context::WarningHandling;
149use crate::util::diagnostic_server::{self, DiagnosticPrinter};
150use crate::util::errors::AlreadyPrintedError;
151use crate::util::machine_message::{self, Message as _};
152use crate::util::{self, internal};
153use crate::util::{DependencyQueue, GlobalContext, Progress, ProgressStyle, Queue};
154
155/// This structure is backed by the `DependencyQueue` type and manages the
156/// queueing of compilation steps for each package. Packages enqueue units of
157/// work and then later on the entire graph is converted to `DrainState` and
158/// executed.
159pub struct JobQueue<'gctx> {
160 queue: DependencyQueue<Unit, Artifact, Job>,
161 counts: HashMap<PackageId, usize>,
162 timings: Timings<'gctx>,
163}
164
165/// This structure is backed by the `DependencyQueue` type and manages the
166/// actual compilation step of each package. Packages enqueue units of work and
167/// then later on the entire graph is processed and compiled.
168///
169/// It is created from `JobQueue` when we have fully assembled the crate graph
170/// (i.e., all package dependencies are known).
171struct DrainState<'gctx> {
172 // This is the length of the DependencyQueue when starting out
173 total_units: usize,
174
175 queue: DependencyQueue<Unit, Artifact, Job>,
176 messages: Arc<Queue<Message>>,
177 /// Diagnostic deduplication support.
178 diag_dedupe: DiagDedupe<'gctx>,
179 /// Count of warnings, used to print a summary after the job succeeds
180 warning_count: HashMap<JobId, WarningCount>,
181 active: HashMap<JobId, Unit>,
182 compiled: HashSet<PackageId>,
183 documented: HashSet<PackageId>,
184 scraped: HashSet<PackageId>,
185 counts: HashMap<PackageId, usize>,
186 progress: Progress<'gctx>,
187 next_id: u32,
188 timings: Timings<'gctx>,
189
190 /// Map from unit index to unit, for looking up dependency information.
191 index_to_unit: HashMap<UnitIndex, Unit>,
192
193 /// Tokens that are currently owned by this Cargo, and may be "associated"
194 /// with a rustc process. They may also be unused, though if so will be
195 /// dropped on the next loop iteration.
196 ///
197 /// Note that the length of this may be zero, but we will still spawn work,
198 /// as we share the implicit token given to this Cargo process with a
199 /// single rustc process.
200 tokens: Vec<Acquired>,
201
202 /// The list of jobs that we have not yet started executing, but have
203 /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
204 /// allow us to request jobserver tokens pretty early.
205 pending_queue: Vec<(Unit, Job, usize)>,
206 print: DiagnosticPrinter<'gctx>,
207
208 /// How many jobs we've finished
209 finished: usize,
210 per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
211}
212
213/// Count of warnings, used to print a summary after the job succeeds
214#[derive(Default, Clone)]
215pub struct WarningCount {
216 /// total number of warnings
217 pub total: usize,
218 /// number of lint warnings
219 pub lints: usize,
220 /// number of warnings that were suppressed because they
221 /// were duplicates of a previous warning
222 pub duplicates: usize,
223 /// number of fixable warnings set to `NotAllowed`
224 /// if any errors have been seen for the current
225 /// target
226 pub fixable: FixableWarnings,
227}
228
229impl WarningCount {
230 /// If an error is seen this should be called
231 /// to set `fixable` to `NotAllowed`
232 fn disallow_fixable(&mut self) {
233 self.fixable = FixableWarnings::NotAllowed;
234 }
235
236 /// Checks fixable if warnings are allowed
237 /// fixable warnings are allowed if no
238 /// errors have been seen for the current
239 /// target. If an error was seen `fixable`
240 /// will be `NotAllowed`.
241 fn fixable_allowed(&self) -> bool {
242 match &self.fixable {
243 FixableWarnings::NotAllowed => false,
244 _ => true,
245 }
246 }
247}
248
249/// Used to keep track of how many fixable warnings there are
250/// and if fixable warnings are allowed
251#[derive(Default, Copy, Clone)]
252pub enum FixableWarnings {
253 NotAllowed,
254 #[default]
255 Zero,
256 Positive(usize),
257}
258
259pub struct ErrorsDuringDrain {
260 pub count: usize,
261}
262
263struct ErrorToHandle {
264 error: anyhow::Error,
265
266 /// This field is true for "interesting" errors and false for "mundane"
267 /// errors. If false, we print the above error only if it's the first one
268 /// encountered so far while draining the job queue.
269 ///
270 /// At most places that an error is propagated, we set this to false to
271 /// avoid scenarios where Cargo might end up spewing tons of redundant error
272 /// messages. For example if an i/o stream got closed somewhere, we don't
273 /// care about individually reporting every thread that it broke; just the
274 /// first is enough.
275 ///
276 /// The exception where `print_always` is true is that we do report every
277 /// instance of a rustc invocation that failed with diagnostics. This
278 /// corresponds to errors from `Message::Finish`.
279 print_always: bool,
280}
281
282impl<E> From<E> for ErrorToHandle
283where
284 anyhow::Error: From<E>,
285{
286 fn from(error: E) -> Self {
287 ErrorToHandle {
288 error: anyhow::Error::from(error),
289 print_always: false,
290 }
291 }
292}
293
294#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
295pub struct JobId(pub u32);
296
297impl std::fmt::Display for JobId {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 write!(f, "{}", self.0)
300 }
301}
302
303/// Handler for deduplicating diagnostics.
304struct DiagDedupe<'gctx> {
305 seen: RefCell<HashSet<u64>>,
306 gctx: &'gctx GlobalContext,
307}
308
309impl<'gctx> DiagDedupe<'gctx> {
310 fn new(gctx: &'gctx GlobalContext) -> Self {
311 DiagDedupe {
312 seen: RefCell::new(HashSet::new()),
313 gctx,
314 }
315 }
316
317 /// Emits a diagnostic message.
318 ///
319 /// Returns `true` if the message was emitted, or `false` if it was
320 /// suppressed for being a duplicate.
321 fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
322 let h = util::hash_u64(diag);
323 if !self.seen.borrow_mut().insert(h) {
324 return Ok(false);
325 }
326 let mut shell = self.gctx.shell();
327 shell.print_ansi_stderr(diag.as_bytes())?;
328 shell.err().write_all(b"\n")?;
329 Ok(true)
330 }
331}
332
333/// Possible artifacts that can be produced by compilations, used as edge values
334/// in the dependency graph.
335///
336/// As edge values we can have multiple kinds of edges depending on one node,
337/// for example some units may only depend on the metadata for an rlib while
338/// others depend on the full rlib. This `Artifact` enum is used to distinguish
339/// this case and track the progress of compilations as they proceed.
340#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
341enum Artifact {
342 /// A generic placeholder for "depends on everything run by a step" and
343 /// means that we can't start the next compilation until the previous has
344 /// finished entirely.
345 All,
346
347 /// A node indicating that we only depend on the metadata of a compilation,
348 /// but the compilation is typically also producing an rlib. We can start
349 /// our step, however, before the full rlib is available.
350 Metadata,
351}
352
353enum Message {
354 Run(JobId, String),
355 Stdout(String),
356 Stderr(String),
357
358 // This is for general stderr output from subprocesses
359 Diagnostic {
360 id: JobId,
361 level: String,
362 diag: String,
363 lint: bool,
364 fixable: bool,
365 },
366 // This handles duplicate output that is suppressed, for showing
367 // only a count of duplicate messages instead
368 WarningCount {
369 id: JobId,
370 lint: bool,
371 emitted: bool,
372 fixable: bool,
373 },
374 // This is for warnings generated by Cargo's interpretation of the
375 // subprocess output, e.g. scrape-examples prints a warning if a
376 // unit fails to be scraped
377 Warning {
378 id: JobId,
379 warning: String,
380 },
381
382 FixDiagnostic(diagnostic_server::Message),
383 Token(io::Result<Acquired>),
384 Finish(JobId, Artifact, CargoResult<()>),
385 FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
386 SectionTiming(JobId, SectionTiming),
387}
388
389impl<'gctx> JobQueue<'gctx> {
390 pub fn new(bcx: &BuildContext<'_, 'gctx>) -> JobQueue<'gctx> {
391 JobQueue {
392 queue: DependencyQueue::new(),
393 counts: HashMap::new(),
394 timings: Timings::new(bcx),
395 }
396 }
397
398 pub fn enqueue(
399 &mut self,
400 build_runner: &BuildRunner<'_, 'gctx>,
401 unit: &Unit,
402 job: Job,
403 ) -> CargoResult<()> {
404 let dependencies = build_runner.unit_deps(unit);
405 let mut queue_deps = dependencies
406 .iter()
407 .filter(|dep| {
408 // Binaries aren't actually needed to *compile* tests, just to run
409 // them, so we don't include this dependency edge in the job graph.
410 // But we shouldn't filter out dependencies being scraped for Rustdoc.
411 (!dep.unit.target.is_test() && !dep.unit.target.is_bin())
412 || dep.unit.artifact.is_true()
413 || dep.unit.mode.is_doc_scrape()
414 })
415 .map(|dep| {
416 // Handle the case here where our `unit -> dep` dependency may
417 // only require the metadata, not the full compilation to
418 // finish. Use the tables in `build_runner` to figure out what
419 // kind of artifact is associated with this dependency.
420 let artifact = if build_runner.only_requires_rmeta(unit, &dep.unit) {
421 Artifact::Metadata
422 } else {
423 Artifact::All
424 };
425 (dep.unit.clone(), artifact)
426 })
427 .collect::<HashMap<_, _>>();
428
429 // This is somewhat tricky, but we may need to synthesize some
430 // dependencies for this target if it requires full upstream
431 // compilations to have completed. Because of pipelining, some
432 // dependency edges may be `Metadata` due to the above clause (as
433 // opposed to everything being `All`). For example consider:
434 //
435 // a (binary)
436 // └ b (lib)
437 // └ c (lib)
438 //
439 // Here the dependency edge from B to C will be `Metadata`, and the
440 // dependency edge from A to B will be `All`. For A to be compiled,
441 // however, it currently actually needs the full rlib of C. This means
442 // that we need to synthesize a dependency edge for the dependency graph
443 // from A to C. That's done here.
444 //
445 // This will walk all dependencies of the current target, and if any of
446 // *their* dependencies are `Metadata` then we depend on the `All` of
447 // the target as well. This should ensure that edges changed to
448 // `Metadata` propagate upwards `All` dependencies to anything that
449 // transitively contains the `Metadata` edge.
450 if unit.requires_upstream_objects() {
451 for dep in dependencies {
452 depend_on_deps_of_deps(build_runner, &mut queue_deps, dep.unit.clone());
453 }
454
455 fn depend_on_deps_of_deps(
456 build_runner: &BuildRunner<'_, '_>,
457 deps: &mut HashMap<Unit, Artifact>,
458 unit: Unit,
459 ) {
460 for dep in build_runner.unit_deps(&unit) {
461 if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
462 depend_on_deps_of_deps(build_runner, deps, dep.unit.clone());
463 }
464 }
465 }
466 }
467
468 // For now we use a fixed placeholder value for the cost of each unit, but
469 // in the future this could be used to allow users to provide hints about
470 // relative expected costs of units, or this could be automatically set in
471 // a smarter way using timing data from a previous compilation.
472 self.queue.queue(unit.clone(), job, queue_deps, 100);
473 *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
474 Ok(())
475 }
476
477 /// Executes all jobs necessary to build the dependency graph.
478 ///
479 /// This function will spawn off `config.jobs()` workers to build all of the
480 /// necessary dependencies, in order. Freshness is propagated as far as
481 /// possible along each dependency chain.
482 #[tracing::instrument(skip_all)]
483 pub fn execute(mut self, build_runner: &mut BuildRunner<'_, '_>) -> CargoResult<()> {
484 self.queue.queue_finished();
485
486 let progress =
487 Progress::with_style("Building", ProgressStyle::Ratio, build_runner.bcx.gctx);
488 let state = DrainState {
489 total_units: self.queue.len(),
490 queue: self.queue,
491 // 100 here is somewhat arbitrary. It is a few screenfulls of
492 // output, and hopefully at most a few megabytes of memory for
493 // typical messages. If you change this, please update the test
494 // caching_large_output, too.
495 messages: Arc::new(Queue::new(100)),
496 diag_dedupe: DiagDedupe::new(build_runner.bcx.gctx),
497 warning_count: HashMap::new(),
498 active: HashMap::new(),
499 compiled: HashSet::new(),
500 documented: HashSet::new(),
501 scraped: HashSet::new(),
502 counts: self.counts,
503 progress,
504 next_id: 0,
505 timings: self.timings,
506 index_to_unit: build_runner
507 .bcx
508 .unit_to_index
509 .iter()
510 .map(|(unit, &index)| (index, unit.clone()))
511 .collect(),
512 tokens: Vec::new(),
513 pending_queue: Vec::new(),
514 print: DiagnosticPrinter::new(
515 build_runner.bcx.gctx,
516 &build_runner.bcx.rustc().workspace_wrapper,
517 ),
518 finished: 0,
519 per_package_future_incompat_reports: Vec::new(),
520 };
521
522 // Create a helper thread for acquiring jobserver tokens
523 let messages = state.messages.clone();
524 let helper = build_runner
525 .jobserver
526 .clone()
527 .into_helper_thread(move |token| {
528 messages.push(Message::Token(token));
529 })
530 .context("failed to create helper thread for jobserver management")?;
531
532 // Create a helper thread to manage the diagnostics for rustfix if
533 // necessary.
534 let messages = state.messages.clone();
535 // It is important that this uses `push` instead of `push_bounded` for
536 // now. If someone wants to fix this to be bounded, the `drop`
537 // implementation needs to be changed to avoid possible deadlocks.
538 let _diagnostic_server = build_runner
539 .bcx
540 .build_config
541 .rustfix_diagnostic_server
542 .borrow_mut()
543 .take()
544 .map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
545
546 thread::scope(
547 move |scope| match state.drain_the_queue(build_runner, scope, &helper) {
548 Some(err) => Err(err),
549 None => Ok(()),
550 },
551 )
552 }
553}
554
555impl<'gctx> DrainState<'gctx> {
556 fn spawn_work_if_possible<'s>(
557 &mut self,
558 build_runner: &mut BuildRunner<'_, '_>,
559 jobserver_helper: &HelperThread,
560 scope: &'s Scope<'s, '_>,
561 ) -> CargoResult<()> {
562 // Dequeue as much work as we can, learning about everything
563 // possible that can run. Note that this is also the point where we
564 // start requesting job tokens. Each job after the first needs to
565 // request a token.
566 while let Some((unit, job, priority)) = self.queue.dequeue() {
567 // We want to keep the pieces of work in the `pending_queue` sorted
568 // by their priorities, and insert the current job at its correctly
569 // sorted position: following the lower priority jobs, and the ones
570 // with the same priority (since they were dequeued before the
571 // current one, we also keep that relation).
572 let idx = self
573 .pending_queue
574 .partition_point(|&(_, _, p)| p <= priority);
575 self.pending_queue.insert(idx, (unit, job, priority));
576 if self.active.len() + self.pending_queue.len() > 1 {
577 jobserver_helper.request_token();
578 }
579 }
580
581 // Now that we've learned of all possible work that we can execute
582 // try to spawn it so long as we've got a jobserver token which says
583 // we're able to perform some parallel work.
584 // The `pending_queue` is sorted in ascending priority order, and we
585 // remove items from its end to schedule the highest priority items
586 // sooner.
587 while self.has_extra_tokens() && !self.pending_queue.is_empty() {
588 let (unit, job, _) = self.pending_queue.pop().unwrap();
589 *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
590 // Print out some nice progress information.
591 // NOTE: An error here will drop the job without starting it.
592 // That should be OK, since we want to exit as soon as
593 // possible during an error.
594 self.note_working_on(
595 build_runner.bcx.gctx,
596 build_runner.bcx.ws.root(),
597 &unit,
598 job.freshness(),
599 )?;
600 self.run(&unit, job, build_runner, scope);
601 }
602
603 Ok(())
604 }
605
606 fn has_extra_tokens(&self) -> bool {
607 self.active.len() < self.tokens.len() + 1
608 }
609
610 fn handle_event(
611 &mut self,
612 build_runner: &mut BuildRunner<'_, '_>,
613 event: Message,
614 ) -> Result<(), ErrorToHandle> {
615 let warning_handling = build_runner.bcx.gctx.warning_handling()?;
616 match event {
617 Message::Run(id, cmd) => {
618 build_runner
619 .bcx
620 .gctx
621 .shell()
622 .verbose(|c| c.status("Running", &cmd))?;
623 self.timings
624 .unit_start(build_runner, id, self.active[&id].clone());
625 }
626 Message::Stdout(out) => {
627 writeln!(build_runner.bcx.gctx.shell().out(), "{}", out)?;
628 }
629 Message::Stderr(err) => {
630 let mut shell = build_runner.bcx.gctx.shell();
631 shell.print_ansi_stderr(err.as_bytes())?;
632 shell.err().write_all(b"\n")?;
633 }
634 Message::Diagnostic {
635 id,
636 level,
637 diag,
638 lint,
639 fixable,
640 } => {
641 let emitted = self.diag_dedupe.emit_diag(&diag)?;
642 if level == "warning" {
643 self.bump_warning_count(id, lint, emitted, fixable);
644 }
645 if level == "error" {
646 let count = self.warning_count.entry(id).or_default();
647 // If there is an error, the `cargo fix` message should not show
648 count.disallow_fixable();
649 }
650 }
651 Message::Warning { id, warning } => {
652 if warning_handling != WarningHandling::Allow {
653 build_runner.bcx.gctx.shell().warn(warning)?;
654 }
655 let lint = false;
656 let emitted = true;
657 let fixable = false;
658 self.bump_warning_count(id, lint, emitted, fixable);
659 }
660 Message::WarningCount {
661 id,
662 lint,
663 emitted,
664 fixable,
665 } => {
666 self.bump_warning_count(id, lint, emitted, fixable);
667 }
668 Message::FixDiagnostic(msg) => {
669 self.print.print(&msg)?;
670 }
671 Message::Finish(id, artifact, mut result) => {
672 let unit = match artifact {
673 // If `id` has completely finished we remove it
674 // from the `active` map ...
675 Artifact::All => {
676 trace!("end: {:?}", id);
677 self.finished += 1;
678 let unit = self.active.remove(&id).unwrap();
679 // An error could add an entry for a `Unit`
680 // with 0 warnings but having fixable
681 // warnings be disallowed
682 let count = self
683 .warning_count
684 .get(&id)
685 .filter(|count| 0 < count.total)
686 .cloned();
687 if let Some(count) = count {
688 self.report_warning_count(
689 build_runner,
690 &unit,
691 &count,
692 &build_runner.bcx.rustc().workspace_wrapper,
693 warning_handling,
694 );
695 let stop_on_warnings = warning_handling == WarningHandling::Deny
696 && 0 < count.lints
697 && !build_runner.bcx.build_config.keep_going;
698 if stop_on_warnings {
699 result = Err(anyhow::format_err!(
700 "warnings are denied by `build.warnings` configuration"
701 ))
702 }
703 }
704 unit
705 }
706 // ... otherwise if it hasn't finished we leave it
707 // in there as we'll get another `Finish` later on.
708 Artifact::Metadata => {
709 trace!("end (meta): {:?}", id);
710 self.active[&id].clone()
711 }
712 };
713 debug!("end ({:?}): {:?}", unit, result);
714 match result {
715 Ok(()) => self.finish(id, &unit, artifact, build_runner)?,
716 Err(_) if build_runner.bcx.unit_can_fail_for_docscraping(&unit) => {
717 build_runner
718 .failed_scrape_units
719 .lock()
720 .unwrap()
721 .insert(build_runner.files().metadata(&unit).unit_id());
722 self.queue.finish(&unit, &artifact);
723 }
724 Err(error) => {
725 let show_warnings = true;
726 self.emit_log_messages(&unit, build_runner, show_warnings)?;
727 self.back_compat_notice(build_runner, &unit)?;
728 return Err(ErrorToHandle {
729 error,
730 print_always: true,
731 });
732 }
733 }
734 }
735 Message::FutureIncompatReport(id, items) => {
736 let unit = &self.active[&id];
737 let package_id = unit.pkg.package_id();
738 let is_local = unit.is_local();
739 self.per_package_future_incompat_reports
740 .push(FutureIncompatReportPackage {
741 package_id,
742 is_local,
743 items,
744 });
745 }
746 Message::Token(acquired_token) => {
747 let token = acquired_token.context("failed to acquire jobserver token")?;
748 self.tokens.push(token);
749 }
750 Message::SectionTiming(id, section) => {
751 self.timings.unit_section_timing(build_runner, id, §ion);
752 }
753 }
754
755 Ok(())
756 }
757
758 // This will also tick the progress bar as appropriate
759 fn wait_for_events(&mut self) -> Vec<Message> {
760 // Drain all events at once to avoid displaying the progress bar
761 // unnecessarily. If there's no events we actually block waiting for
762 // an event, but we keep a "heartbeat" going to allow `record_cpu`
763 // to run above to calculate CPU usage over time. To do this we
764 // listen for a message with a timeout, and on timeout we run the
765 // previous parts of the loop again.
766 let mut events = self.messages.try_pop_all();
767 if events.is_empty() {
768 loop {
769 self.tick_progress();
770 self.tokens.truncate(self.active.len() - 1);
771 match self.messages.pop(Duration::from_millis(500)) {
772 Some(message) => {
773 events.push(message);
774 break;
775 }
776 None => continue,
777 }
778 }
779 }
780 events
781 }
782
783 /// This is the "main" loop, where Cargo does all work to run the
784 /// compiler.
785 ///
786 /// This returns an Option to prevent the use of `?` on `Result` types
787 /// because it is important for the loop to carefully handle errors.
788 fn drain_the_queue<'s>(
789 mut self,
790 build_runner: &mut BuildRunner<'_, '_>,
791 scope: &'s Scope<'s, '_>,
792 jobserver_helper: &HelperThread,
793 ) -> Option<anyhow::Error> {
794 trace!("queue: {:#?}", self.queue);
795
796 // Iteratively execute the entire dependency graph. Each turn of the
797 // loop starts out by scheduling as much work as possible (up to the
798 // maximum number of parallel jobs we have tokens for). A local queue
799 // is maintained separately from the main dependency queue as one
800 // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
801 // in one package).
802 //
803 // After a job has finished we update our internal state if it was
804 // successful and otherwise wait for pending work to finish if it failed
805 // and then immediately return (or keep going, if requested by the build
806 // config).
807 let mut errors = ErrorsDuringDrain { count: 0 };
808 // CAUTION! Do not use `?` or break out of the loop early. Every error
809 // must be handled in such a way that the loop is still allowed to
810 // drain event messages.
811 loop {
812 if errors.count == 0 || build_runner.bcx.build_config.keep_going {
813 if let Err(e) = self.spawn_work_if_possible(build_runner, jobserver_helper, scope) {
814 self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
815 }
816 }
817
818 // If after all that we're not actually running anything then we're
819 // done!
820 if self.active.is_empty() {
821 break;
822 }
823
824 // And finally, before we block waiting for the next event, drop any
825 // excess tokens we may have accidentally acquired. Due to how our
826 // jobserver interface is architected we may acquire a token that we
827 // don't actually use, and if this happens just relinquish it back
828 // to the jobserver itself.
829 for event in self.wait_for_events() {
830 if let Err(event_err) = self.handle_event(build_runner, event) {
831 self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, event_err);
832 }
833 }
834 }
835 self.progress.clear();
836
837 let profile_name = build_runner.bcx.build_config.requested_profile;
838 // NOTE: this may be a bit inaccurate, since this may not display the
839 // profile for what was actually built. Profile overrides can change
840 // these settings, and in some cases different targets are built with
841 // different profiles. To be accurate, it would need to collect a
842 // list of Units built, and maybe display a list of the different
843 // profiles used. However, to keep it simple and compatible with old
844 // behavior, we just display what the base profile is.
845 let profile = build_runner.bcx.profiles.base_profile();
846 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
847 "unoptimized"
848 } else {
849 "optimized"
850 });
851 if profile.debuginfo.is_turned_on() {
852 opt_type += " + debuginfo";
853 }
854
855 let time_elapsed = util::elapsed(build_runner.bcx.gctx.creation_time().elapsed());
856 if let Err(e) = self
857 .timings
858 .finished(build_runner, &errors.to_error())
859 .context("failed to render timing report")
860 {
861 self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
862 }
863 if build_runner.bcx.build_config.emit_json() {
864 let mut shell = build_runner.bcx.gctx.shell();
865 let msg = machine_message::BuildFinished {
866 success: errors.count == 0,
867 }
868 .to_json_string();
869 if let Err(e) = writeln!(shell.out(), "{}", msg) {
870 self.handle_error(&mut shell, &mut errors, e);
871 }
872 }
873
874 if let Some(error) = errors.to_error() {
875 // Any errors up to this point have already been printed via the
876 // `display_error` inside `handle_error`.
877 Some(anyhow::Error::new(AlreadyPrintedError::new(error)))
878 } else if self.queue.is_empty() && self.pending_queue.is_empty() {
879 let profile_link = build_runner.bcx.gctx.shell().err_hyperlink(
880 "https://doc.rust-lang.org/cargo/reference/profiles.html#default-profiles",
881 );
882 let message = format!(
883 "{profile_link}`{profile_name}` profile [{opt_type}]{profile_link:#} target(s) in {time_elapsed}",
884 );
885 // It doesn't really matter if this fails.
886 let _ = build_runner.bcx.gctx.shell().status("Finished", message);
887 future_incompat::save_and_display_report(
888 build_runner.bcx,
889 &self.per_package_future_incompat_reports,
890 );
891
892 None
893 } else {
894 debug!("queue: {:#?}", self.queue);
895 Some(internal("finished with jobs still left in the queue"))
896 }
897 }
898
899 fn handle_error(
900 &mut self,
901 shell: &mut Shell,
902 err_state: &mut ErrorsDuringDrain,
903 new_err: impl Into<ErrorToHandle>,
904 ) {
905 let new_err = new_err.into();
906 if new_err.print_always || err_state.count == 0 {
907 crate::display_error(&new_err.error, shell);
908 if err_state.count == 0 && !self.active.is_empty() {
909 self.progress.indicate_error();
910 let _ = shell.warn("build failed, waiting for other jobs to finish...");
911 }
912 err_state.count += 1;
913 } else {
914 tracing::warn!("{:?}", new_err.error);
915 }
916 }
917
918 // This also records CPU usage and marks concurrency; we roughly want to do
919 // this as often as we spin on the events receiver (at least every 500ms or
920 // so).
921 fn tick_progress(&mut self) {
922 // Record some timing information if `--timings` is enabled, and
923 // this'll end up being a noop if we're not recording this
924 // information.
925 self.timings.record_cpu();
926
927 let active_names = self
928 .active
929 .values()
930 .map(|u| self.name_for_progress(u))
931 .collect::<Vec<_>>();
932 let _ = self.progress.tick_now(
933 self.finished,
934 self.total_units,
935 &format!(": {}", active_names.join(", ")),
936 );
937 }
938
939 fn name_for_progress(&self, unit: &Unit) -> String {
940 let pkg_name = unit.pkg.name();
941 let target_name = unit.target.name();
942 match unit.mode {
943 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
944 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
945 CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
946 TargetKind::Lib(_) => format!("{}(test)", target_name),
947 TargetKind::CustomBuild => panic!("cannot test build script"),
948 TargetKind::Bin => format!("{}(bin test)", target_name),
949 TargetKind::Test => format!("{}(test)", target_name),
950 TargetKind::Bench => format!("{}(bench)", target_name),
951 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
952 format!("{}(example test)", target_name)
953 }
954 },
955 _ => match unit.target.kind() {
956 TargetKind::Lib(_) => pkg_name.to_string(),
957 TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
958 TargetKind::Bin => format!("{}(bin)", target_name),
959 TargetKind::Test => format!("{}(test)", target_name),
960 TargetKind::Bench => format!("{}(bench)", target_name),
961 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
962 format!("{}(example)", target_name)
963 }
964 },
965 }
966 }
967
968 /// Executes a job.
969 ///
970 /// Fresh jobs block until finished (which should be very fast!), Dirty
971 /// jobs will spawn a thread in the background and return immediately.
972 fn run<'s>(
973 &mut self,
974 unit: &Unit,
975 job: Job,
976 build_runner: &BuildRunner<'_, '_>,
977 scope: &'s Scope<'s, '_>,
978 ) {
979 let id = JobId(self.next_id);
980 self.next_id = self.next_id.checked_add(1).unwrap();
981
982 debug!("start {}: {:?}", id, unit);
983
984 assert!(self.active.insert(id, unit.clone()).is_none());
985
986 let messages = self.messages.clone();
987 let is_fresh = job.freshness().is_fresh();
988 let rmeta_required = build_runner.rmeta_required(unit);
989 let lock_manager = build_runner.lock_manager.clone();
990
991 let doit = move |diag_dedupe| {
992 let state = JobState::new(id, messages, diag_dedupe, rmeta_required, lock_manager);
993 state.run_to_finish(job);
994 };
995
996 match is_fresh {
997 true => {
998 // Running a fresh job on the same thread is often much faster than spawning a new
999 // thread to run the job.
1000 doit(Some(&self.diag_dedupe));
1001 }
1002 false => {
1003 scope.spawn(move || doit(None));
1004 }
1005 }
1006 }
1007
1008 fn emit_log_messages(
1009 &self,
1010 unit: &Unit,
1011 build_runner: &mut BuildRunner<'_, '_>,
1012 show_warnings: bool,
1013 ) -> CargoResult<()> {
1014 let outputs = build_runner.build_script_outputs.lock().unwrap();
1015 let Some(metadata_vec) = build_runner.find_build_script_metadatas(unit) else {
1016 return Ok(());
1017 };
1018 let bcx = &mut build_runner.bcx;
1019 for metadata in metadata_vec {
1020 if let Some(output) = outputs.get(metadata) {
1021 if !output.log_messages.is_empty()
1022 && (show_warnings
1023 || output
1024 .log_messages
1025 .iter()
1026 .any(|(severity, _)| *severity == Severity::Error))
1027 {
1028 let msg_with_package =
1029 |msg: &str| format!("{}@{}: {}", unit.pkg.name(), unit.pkg.version(), msg);
1030
1031 for (severity, message) in output.log_messages.iter() {
1032 match severity {
1033 Severity::Error => {
1034 bcx.gctx.shell().error(msg_with_package(message))?;
1035 }
1036 Severity::Warning => {
1037 bcx.gctx.shell().warn(msg_with_package(message))?;
1038 }
1039 }
1040 }
1041 }
1042 }
1043 }
1044
1045 Ok(())
1046 }
1047
1048 fn bump_warning_count(&mut self, id: JobId, lint: bool, emitted: bool, fixable: bool) {
1049 let count = self.warning_count.entry(id).or_default();
1050 count.total += 1;
1051 if lint {
1052 let unit = self.active.get(&id).unwrap();
1053 // If this is an upstream dep but we *do* want warnings, make sure that they
1054 // don't fail compilation.
1055 if unit.is_local() {
1056 count.lints += 1;
1057 }
1058 }
1059 if !emitted {
1060 count.duplicates += 1;
1061 // Don't add to fixable if it's already been emitted
1062 } else if fixable {
1063 // Do not add anything to the fixable warning count if
1064 // is `NotAllowed` since that indicates there was an
1065 // error while building this `Unit`
1066 if count.fixable_allowed() {
1067 count.fixable = match count.fixable {
1068 FixableWarnings::NotAllowed => FixableWarnings::NotAllowed,
1069 FixableWarnings::Zero => FixableWarnings::Positive(1),
1070 FixableWarnings::Positive(fixable) => FixableWarnings::Positive(fixable + 1),
1071 };
1072 }
1073 }
1074 }
1075
1076 /// Displays a final report of the warnings emitted by a particular job.
1077 fn report_warning_count(
1078 &mut self,
1079 runner: &mut BuildRunner<'_, '_>,
1080 unit: &Unit,
1081 count: &WarningCount,
1082 rustc_workspace_wrapper: &Option<PathBuf>,
1083 warning_handling: WarningHandling,
1084 ) {
1085 let gctx = runner.bcx.gctx;
1086 runner.compilation.lint_warning_count += count.lints;
1087 let mut message = descriptive_pkg_name(&unit.pkg.name(), &unit.target, &unit.mode);
1088 message.push_str(" generated ");
1089 match count.total {
1090 1 => message.push_str("1 warning"),
1091 n => {
1092 let _ = write!(message, "{} warnings", n);
1093 }
1094 };
1095 match count.duplicates {
1096 0 => {}
1097 1 => message.push_str(" (1 duplicate)"),
1098 n => {
1099 let _ = write!(message, " ({} duplicates)", n);
1100 }
1101 }
1102 // Only show the `cargo fix` message if its a local `Unit`
1103 if unit.is_local() {
1104 // Do not show this if there are any errors or no fixable warnings
1105 if let FixableWarnings::Positive(fixable) = count.fixable {
1106 // `cargo fix` doesn't have an option for custom builds
1107 if !unit.target.is_custom_build() {
1108 // To make sure the correct command is shown for `clippy` we
1109 // check if `RUSTC_WORKSPACE_WRAPPER` is set and pointing towards
1110 // `clippy-driver`.
1111 let clippy = std::ffi::OsStr::new("clippy-driver");
1112 let is_clippy = rustc_workspace_wrapper.as_ref().and_then(|x| x.file_stem())
1113 == Some(clippy);
1114
1115 let command = if is_clippy {
1116 "cargo clippy --fix"
1117 } else {
1118 "cargo fix"
1119 };
1120
1121 let mut args =
1122 format!("{} -p {}", unit.target.description_named(), unit.pkg.name());
1123 if unit.mode.is_rustc_test()
1124 && !(unit.target.is_test() || unit.target.is_bench())
1125 {
1126 args.push_str(" --tests");
1127 }
1128 let mut suggestions = format!("{} suggestion", fixable);
1129 if fixable > 1 {
1130 suggestions.push_str("s")
1131 }
1132
1133 #[expect(clippy::disallowed_methods, reason = "consistency with clippy")]
1134 let _ = write!(
1135 message,
1136 " (run `{command} --{args}{}` to apply {suggestions})",
1137 if let Some(cli_lints_os) = env::var_os("CLIPPY_ARGS")
1138 && let Ok(cli_lints) = cli_lints_os.into_string()
1139 && is_clippy
1140 {
1141 // Clippy can take lints through the CLI, each lint flag is separated by "__CLIPPY_HACKERY__".
1142 let cli_lints = cli_lints.replace("__CLIPPY_HACKERY__", " ");
1143 let cli_lints = cli_lints.trim_ascii_end(); // Remove that last space left by __CLIPPY_HACKERY__
1144 format!(" -- {cli_lints}")
1145 } else {
1146 "".to_owned()
1147 }
1148 );
1149 }
1150 }
1151 }
1152 // Errors are ignored here because it is tricky to handle them
1153 // correctly, and they aren't important.
1154 let _ = if warning_handling == WarningHandling::Deny && 0 < count.lints {
1155 gctx.shell().error(message)
1156 } else {
1157 gctx.shell().warn(message)
1158 };
1159 }
1160
1161 fn finish(
1162 &mut self,
1163 id: JobId,
1164 unit: &Unit,
1165 artifact: Artifact,
1166 build_runner: &mut BuildRunner<'_, '_>,
1167 ) -> CargoResult<()> {
1168 if unit.mode.is_run_custom_build() {
1169 self.emit_log_messages(
1170 unit,
1171 build_runner,
1172 unit.show_warnings(build_runner.bcx.gctx),
1173 )?;
1174 }
1175 let unblocked = self.queue.finish(unit, &artifact);
1176 match artifact {
1177 Artifact::All => self.timings.unit_finished(build_runner, id, unblocked),
1178 Artifact::Metadata => self
1179 .timings
1180 .unit_rmeta_finished(build_runner, id, unblocked),
1181 }
1182 Ok(())
1183 }
1184
1185 // This isn't super trivial because we don't want to print loads and
1186 // loads of information to the console, but we also want to produce a
1187 // faithful representation of what's happening. This is somewhat nuanced
1188 // as a package can start compiling *very* early on because of custom
1189 // build commands and such.
1190 //
1191 // In general, we try to print "Compiling" for the first nontrivial task
1192 // run for a package, regardless of when that is. We then don't print
1193 // out any more information for a package after we've printed it once.
1194 fn note_working_on(
1195 &mut self,
1196 gctx: &GlobalContext,
1197 ws_root: &Path,
1198 unit: &Unit,
1199 fresh: &Freshness,
1200 ) -> CargoResult<()> {
1201 if (self.compiled.contains(&unit.pkg.package_id())
1202 && !unit.mode.is_doc()
1203 && !unit.mode.is_doc_scrape())
1204 || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
1205 || (self.scraped.contains(&unit.pkg.package_id()) && unit.mode.is_doc_scrape())
1206 {
1207 return Ok(());
1208 }
1209
1210 match fresh {
1211 // Any dirty stage which runs at least one command gets printed as
1212 // being a compiled package.
1213 Dirty(dirty_reason) => {
1214 if !dirty_reason.is_fresh_build() {
1215 gctx.shell().verbose(|shell| {
1216 dirty_reason.present_to(shell, unit, ws_root, &self.index_to_unit)
1217 })?;
1218 }
1219
1220 if unit.mode.is_doc() {
1221 self.documented.insert(unit.pkg.package_id());
1222 gctx.shell().status("Documenting", &unit.pkg)?;
1223 } else if unit.mode.is_doc_test() {
1224 // Skip doc test.
1225 } else if unit.mode.is_doc_scrape() {
1226 self.scraped.insert(unit.pkg.package_id());
1227 gctx.shell().status("Scraping", &unit.pkg)?;
1228 } else {
1229 self.compiled.insert(unit.pkg.package_id());
1230 if unit.mode.is_check() {
1231 gctx.shell().status("Checking", &unit.pkg)?;
1232 } else {
1233 gctx.shell().status("Compiling", &unit.pkg)?;
1234 }
1235 }
1236 }
1237 Fresh => {
1238 // If doc test are last, only print "Fresh" if nothing has been printed.
1239 if self.counts[&unit.pkg.package_id()] == 0
1240 && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
1241 {
1242 self.compiled.insert(unit.pkg.package_id());
1243 gctx.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
1244 }
1245 }
1246 }
1247 Ok(())
1248 }
1249
1250 fn back_compat_notice(
1251 &self,
1252 build_runner: &BuildRunner<'_, '_>,
1253 unit: &Unit,
1254 ) -> CargoResult<()> {
1255 if unit.pkg.name() != "diesel"
1256 || unit.pkg.version() >= &Version::new(1, 4, 8)
1257 || build_runner.bcx.ws.resolve_behavior() == ResolveBehavior::V1
1258 || !unit.pkg.package_id().source_id().is_registry()
1259 || !unit.features.is_empty()
1260 {
1261 return Ok(());
1262 }
1263 if !build_runner
1264 .bcx
1265 .unit_graph
1266 .keys()
1267 .any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
1268 {
1269 return Ok(());
1270 }
1271 build_runner.bcx.gctx.shell().note(
1272 "\
1273This error may be due to an interaction between diesel and Cargo's new
1274feature resolver. Try updating to diesel 1.4.8 to fix this error.
1275",
1276 )?;
1277 Ok(())
1278 }
1279}
1280
1281impl ErrorsDuringDrain {
1282 fn to_error(&self) -> Option<anyhow::Error> {
1283 match self.count {
1284 0 => None,
1285 1 => Some(format_err!("1 job failed")),
1286 n => Some(format_err!("{} jobs failed", n)),
1287 }
1288 }
1289}