cargo/core/compiler/job_queue/mod.rs
1//! Management of the interaction between the main `cargo` and all spawned jobs.
2//!
3//! ## Overview
4//!
5//! This module implements a job queue. A job here represents a unit of work,
6//! which is roughly a rustc invocation, a build script run, or just a no-op.
7//! The job queue primarily handles the following things:
8//!
9//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be
10//! either executed on a spawned thread or ran on the same thread to avoid
11//! the threading overhead.
12//! * Controls the number of concurrency. It allocates and manages [`jobserver`]
13//! tokens to each spawned off rustc and build scripts.
14//! * Manages the communication between the main `cargo` process and its
15//! spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared
16//! across threads.
17//! * Schedules the execution order of each [`Job`]. Priorities are determined
18//! when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is
19//! relatively rudimentary and could likely be improved.
20//!
21//! A rough outline of building a queue and executing jobs is:
22//!
23//! 1. [`JobQueue::new`] to simply create one queue.
24//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue.
25//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`].
26//!
27//! The primary loop happens insides [`JobQueue::execute`], which is effectively
28//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells,
29//! the running state of the job queue getting drained.
30//!
31//! ## Jobserver
32//!
33//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
34//! relationship with each other. They share a single jobserver amongst what
35//! is potentially hundreds of threads of work on many-cored systems.
36//! The jobserver could come from either the environment (e.g., from a `make`
37//! invocation), or from Cargo creating its own jobserver server if there is no
38//! jobserver to inherit from.
39//!
40//! Cargo wants to complete the build as quickly as possible, fully saturating
41//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
42//! more than N threads of work: the total amount of tokens we have floating
43//! around must always be limited to N.
44//!
45//! It is not really possible to optimally choose which crate should build
46//! first or last; nor is it possible to decide whether to give an additional
47//! token to rustc first or rather spawn a new crate of work. The algorithm in
48//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
49//! possible. In short, the jobserver relationship among Cargo and rustc
50//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
51//! processes in terms of parallelism[^parallel-rustc].
52//!
53//! We integrate with the [jobserver] crate, originating from GNU make
54//! [POSIX jobserver], to make sure that build scripts which use make to
55//! build C code can cooperate with us on the number of used tokens and
56//! avoid overfilling the system we're on.
57//!
58//! ## Scheduling
59//!
60//! The current scheduling algorithm is not really polished. It is simply based
61//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto
62//! the graph until we finalize it. When the graph gets finalized, it finds the
63//! sum of the cost of each dependencies of each node, including transitively.
64//! The sum of dependency cost turns out to be the cost of each given node.
65//!
66//! At the time being, the cost is just passed as a fixed placeholder in
67//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities
68//! around it. For instance, we start persisting timing information for each
69//! build somewhere. For a subsequent build, we can look into the historical
70//! data and perform a PGO-like optimization to prioritize jobs, making a build
71//! fully pipelined.
72//!
73//! ## Message queue
74//!
75//! Each spawned thread running a process uses the message queue [`Queue`] to
76//! send messages back to the main thread (the one running `cargo`).
77//! The main thread coordinates everything, and handles printing output.
78//!
79//! It is important to be careful which messages use [`push`] vs [`push_bounded`].
80//! `push` is for priority messages (like tokens, or "finished") where the
81//! sender shouldn't block. We want to handle those so real work can proceed
82//! ASAP.
83//!
84//! `push_bounded` is only for messages being printed to stdout/stderr. Being
85//! bounded prevents a flood of messages causing a large amount of memory
86//! being used.
87//!
88//! `push` also avoids blocking which helps avoid deadlocks. For example, when
89//! the diagnostic server thread is dropped, it waits for the thread to exit.
90//! But if the thread is blocked on a full queue, and there is a critical
91//! error, the drop will deadlock. This should be fixed at some point in the
92//! future. The jobserver thread has a similar problem, though it will time
93//! out after 1 second.
94//!
95//! To access the message queue, each running `Job` is given its own [`JobState`],
96//! containing everything it needs to communicate with the main thread.
97//!
98//! See [`Message`] for all available message kinds.
99//!
100//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
101//! allocation of tokens to rustc beyond the implicit token each rustc owns
102//! (i.e., the ones used for parallel LLVM work and parallel rustc threads).
103//! See also ["Rust Compiler Development Guide: Parallel Compilation"]
104//! and [this comment][rustc-codegen] in rust-lang/rust.
105//!
106//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
107//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
108//! [jobserver]: https://docs.rs/jobserver
109//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
110//! [`push`]: Queue::push
111//! [`push_bounded`]: Queue::push_bounded
112
113mod job;
114mod job_state;
115
116use std::cell::RefCell;
117use std::collections::{HashMap, HashSet};
118use std::fmt::Write as _;
119use std::io;
120use std::path::{Path, PathBuf};
121use std::sync::Arc;
122use std::thread::{self, Scope};
123use std::time::Duration;
124
125use anyhow::{Context as _, format_err};
126use cargo_util::ProcessBuilder;
127use jobserver::{Acquired, HelperThread};
128use semver::Version;
129use tracing::{debug, trace};
130
131pub use self::job::Freshness::{self, Dirty, Fresh};
132pub use self::job::{Job, Work};
133pub use self::job_state::JobState;
134use super::build_runner::OutputFile;
135use super::custom_build::Severity;
136use super::timings::{SectionTiming, Timings};
137use super::{BuildContext, BuildPlan, BuildRunner, CompileMode, Unit};
138use crate::core::compiler::descriptive_pkg_name;
139use crate::core::compiler::future_incompat::{
140 self, FutureBreakageItem, FutureIncompatReportPackage,
141};
142use crate::core::resolver::ResolveBehavior;
143use crate::core::{PackageId, Shell, TargetKind};
144use crate::util::CargoResult;
145use crate::util::context::WarningHandling;
146use crate::util::diagnostic_server::{self, DiagnosticPrinter};
147use crate::util::errors::AlreadyPrintedError;
148use crate::util::machine_message::{self, Message as _};
149use crate::util::{self, internal};
150use crate::util::{DependencyQueue, GlobalContext, Progress, ProgressStyle, Queue};
151
152/// This structure is backed by the `DependencyQueue` type and manages the
153/// queueing of compilation steps for each package. Packages enqueue units of
154/// work and then later on the entire graph is converted to `DrainState` and
155/// executed.
156pub struct JobQueue<'gctx> {
157 queue: DependencyQueue<Unit, Artifact, Job>,
158 counts: HashMap<PackageId, usize>,
159 timings: Timings<'gctx>,
160}
161
162/// This structure is backed by the `DependencyQueue` type and manages the
163/// actual compilation step of each package. Packages enqueue units of work and
164/// then later on the entire graph is processed and compiled.
165///
166/// It is created from `JobQueue` when we have fully assembled the crate graph
167/// (i.e., all package dependencies are known).
168struct DrainState<'gctx> {
169 // This is the length of the DependencyQueue when starting out
170 total_units: usize,
171
172 queue: DependencyQueue<Unit, Artifact, Job>,
173 messages: Arc<Queue<Message>>,
174 /// Diagnostic deduplication support.
175 diag_dedupe: DiagDedupe<'gctx>,
176 /// Count of warnings, used to print a summary after the job succeeds
177 warning_count: HashMap<JobId, WarningCount>,
178 active: HashMap<JobId, Unit>,
179 compiled: HashSet<PackageId>,
180 documented: HashSet<PackageId>,
181 scraped: HashSet<PackageId>,
182 counts: HashMap<PackageId, usize>,
183 progress: Progress<'gctx>,
184 next_id: u32,
185 timings: Timings<'gctx>,
186
187 /// Tokens that are currently owned by this Cargo, and may be "associated"
188 /// with a rustc process. They may also be unused, though if so will be
189 /// dropped on the next loop iteration.
190 ///
191 /// Note that the length of this may be zero, but we will still spawn work,
192 /// as we share the implicit token given to this Cargo process with a
193 /// single rustc process.
194 tokens: Vec<Acquired>,
195
196 /// The list of jobs that we have not yet started executing, but have
197 /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
198 /// allow us to request jobserver tokens pretty early.
199 pending_queue: Vec<(Unit, Job, usize)>,
200 print: DiagnosticPrinter<'gctx>,
201
202 /// How many jobs we've finished
203 finished: usize,
204 per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
205}
206
207/// Count of warnings, used to print a summary after the job succeeds
208#[derive(Default)]
209pub struct WarningCount {
210 /// total number of warnings
211 pub total: usize,
212 /// number of warnings that were suppressed because they
213 /// were duplicates of a previous warning
214 pub duplicates: usize,
215 /// number of fixable warnings set to `NotAllowed`
216 /// if any errors have been seen for the current
217 /// target
218 pub fixable: FixableWarnings,
219}
220
221impl WarningCount {
222 /// If an error is seen this should be called
223 /// to set `fixable` to `NotAllowed`
224 fn disallow_fixable(&mut self) {
225 self.fixable = FixableWarnings::NotAllowed;
226 }
227
228 /// Checks fixable if warnings are allowed
229 /// fixable warnings are allowed if no
230 /// errors have been seen for the current
231 /// target. If an error was seen `fixable`
232 /// will be `NotAllowed`.
233 fn fixable_allowed(&self) -> bool {
234 match &self.fixable {
235 FixableWarnings::NotAllowed => false,
236 _ => true,
237 }
238 }
239}
240
241/// Used to keep track of how many fixable warnings there are
242/// and if fixable warnings are allowed
243#[derive(Default)]
244pub enum FixableWarnings {
245 NotAllowed,
246 #[default]
247 Zero,
248 Positive(usize),
249}
250
251pub struct ErrorsDuringDrain {
252 pub count: usize,
253}
254
255struct ErrorToHandle {
256 error: anyhow::Error,
257
258 /// This field is true for "interesting" errors and false for "mundane"
259 /// errors. If false, we print the above error only if it's the first one
260 /// encountered so far while draining the job queue.
261 ///
262 /// At most places that an error is propagated, we set this to false to
263 /// avoid scenarios where Cargo might end up spewing tons of redundant error
264 /// messages. For example if an i/o stream got closed somewhere, we don't
265 /// care about individually reporting every thread that it broke; just the
266 /// first is enough.
267 ///
268 /// The exception where `print_always` is true is that we do report every
269 /// instance of a rustc invocation that failed with diagnostics. This
270 /// corresponds to errors from `Message::Finish`.
271 print_always: bool,
272}
273
274impl<E> From<E> for ErrorToHandle
275where
276 anyhow::Error: From<E>,
277{
278 fn from(error: E) -> Self {
279 ErrorToHandle {
280 error: anyhow::Error::from(error),
281 print_always: false,
282 }
283 }
284}
285
286#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
287pub struct JobId(pub u32);
288
289impl std::fmt::Display for JobId {
290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 write!(f, "{}", self.0)
292 }
293}
294
295/// Handler for deduplicating diagnostics.
296struct DiagDedupe<'gctx> {
297 seen: RefCell<HashSet<u64>>,
298 gctx: &'gctx GlobalContext,
299}
300
301impl<'gctx> DiagDedupe<'gctx> {
302 fn new(gctx: &'gctx GlobalContext) -> Self {
303 DiagDedupe {
304 seen: RefCell::new(HashSet::new()),
305 gctx,
306 }
307 }
308
309 /// Emits a diagnostic message.
310 ///
311 /// Returns `true` if the message was emitted, or `false` if it was
312 /// suppressed for being a duplicate.
313 fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
314 let h = util::hash_u64(diag);
315 if !self.seen.borrow_mut().insert(h) {
316 return Ok(false);
317 }
318 let mut shell = self.gctx.shell();
319 shell.print_ansi_stderr(diag.as_bytes())?;
320 shell.err().write_all(b"\n")?;
321 Ok(true)
322 }
323}
324
325/// Possible artifacts that can be produced by compilations, used as edge values
326/// in the dependency graph.
327///
328/// As edge values we can have multiple kinds of edges depending on one node,
329/// for example some units may only depend on the metadata for an rlib while
330/// others depend on the full rlib. This `Artifact` enum is used to distinguish
331/// this case and track the progress of compilations as they proceed.
332#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
333enum Artifact {
334 /// A generic placeholder for "depends on everything run by a step" and
335 /// means that we can't start the next compilation until the previous has
336 /// finished entirely.
337 All,
338
339 /// A node indicating that we only depend on the metadata of a compilation,
340 /// but the compilation is typically also producing an rlib. We can start
341 /// our step, however, before the full rlib is available.
342 Metadata,
343}
344
345enum Message {
346 Run(JobId, String),
347 BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
348 Stdout(String),
349 Stderr(String),
350
351 // This is for general stderr output from subprocesses
352 Diagnostic {
353 id: JobId,
354 level: String,
355 diag: String,
356 fixable: bool,
357 },
358 // This handles duplicate output that is suppressed, for showing
359 // only a count of duplicate messages instead
360 WarningCount {
361 id: JobId,
362 emitted: bool,
363 fixable: bool,
364 },
365 // This is for warnings generated by Cargo's interpretation of the
366 // subprocess output, e.g. scrape-examples prints a warning if a
367 // unit fails to be scraped
368 Warning {
369 id: JobId,
370 warning: String,
371 },
372
373 FixDiagnostic(diagnostic_server::Message),
374 Token(io::Result<Acquired>),
375 Finish(JobId, Artifact, CargoResult<()>),
376 FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
377 SectionTiming(JobId, SectionTiming),
378}
379
380impl<'gctx> JobQueue<'gctx> {
381 pub fn new(bcx: &BuildContext<'_, 'gctx>) -> JobQueue<'gctx> {
382 JobQueue {
383 queue: DependencyQueue::new(),
384 counts: HashMap::new(),
385 timings: Timings::new(bcx, &bcx.roots),
386 }
387 }
388
389 pub fn enqueue(
390 &mut self,
391 build_runner: &BuildRunner<'_, 'gctx>,
392 unit: &Unit,
393 job: Job,
394 ) -> CargoResult<()> {
395 let dependencies = build_runner.unit_deps(unit);
396 let mut queue_deps = dependencies
397 .iter()
398 .filter(|dep| {
399 // Binaries aren't actually needed to *compile* tests, just to run
400 // them, so we don't include this dependency edge in the job graph.
401 // But we shouldn't filter out dependencies being scraped for Rustdoc.
402 (!dep.unit.target.is_test() && !dep.unit.target.is_bin())
403 || dep.unit.artifact.is_true()
404 || dep.unit.mode.is_doc_scrape()
405 })
406 .map(|dep| {
407 // Handle the case here where our `unit -> dep` dependency may
408 // only require the metadata, not the full compilation to
409 // finish. Use the tables in `build_runner` to figure out what
410 // kind of artifact is associated with this dependency.
411 let artifact = if build_runner.only_requires_rmeta(unit, &dep.unit) {
412 Artifact::Metadata
413 } else {
414 Artifact::All
415 };
416 (dep.unit.clone(), artifact)
417 })
418 .collect::<HashMap<_, _>>();
419
420 // This is somewhat tricky, but we may need to synthesize some
421 // dependencies for this target if it requires full upstream
422 // compilations to have completed. Because of pipelining, some
423 // dependency edges may be `Metadata` due to the above clause (as
424 // opposed to everything being `All`). For example consider:
425 //
426 // a (binary)
427 // └ b (lib)
428 // └ c (lib)
429 //
430 // Here the dependency edge from B to C will be `Metadata`, and the
431 // dependency edge from A to B will be `All`. For A to be compiled,
432 // however, it currently actually needs the full rlib of C. This means
433 // that we need to synthesize a dependency edge for the dependency graph
434 // from A to C. That's done here.
435 //
436 // This will walk all dependencies of the current target, and if any of
437 // *their* dependencies are `Metadata` then we depend on the `All` of
438 // the target as well. This should ensure that edges changed to
439 // `Metadata` propagate upwards `All` dependencies to anything that
440 // transitively contains the `Metadata` edge.
441 if unit.requires_upstream_objects() {
442 for dep in dependencies {
443 depend_on_deps_of_deps(build_runner, &mut queue_deps, dep.unit.clone());
444 }
445
446 fn depend_on_deps_of_deps(
447 build_runner: &BuildRunner<'_, '_>,
448 deps: &mut HashMap<Unit, Artifact>,
449 unit: Unit,
450 ) {
451 for dep in build_runner.unit_deps(&unit) {
452 if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
453 depend_on_deps_of_deps(build_runner, deps, dep.unit.clone());
454 }
455 }
456 }
457 }
458
459 // For now we use a fixed placeholder value for the cost of each unit, but
460 // in the future this could be used to allow users to provide hints about
461 // relative expected costs of units, or this could be automatically set in
462 // a smarter way using timing data from a previous compilation.
463 self.queue.queue(unit.clone(), job, queue_deps, 100);
464 *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
465 Ok(())
466 }
467
468 /// Executes all jobs necessary to build the dependency graph.
469 ///
470 /// This function will spawn off `config.jobs()` workers to build all of the
471 /// necessary dependencies, in order. Freshness is propagated as far as
472 /// possible along each dependency chain.
473 #[tracing::instrument(skip_all)]
474 pub fn execute(
475 mut self,
476 build_runner: &mut BuildRunner<'_, '_>,
477 plan: &mut BuildPlan,
478 ) -> CargoResult<()> {
479 self.queue.queue_finished();
480
481 let progress =
482 Progress::with_style("Building", ProgressStyle::Ratio, build_runner.bcx.gctx);
483 let state = DrainState {
484 total_units: self.queue.len(),
485 queue: self.queue,
486 // 100 here is somewhat arbitrary. It is a few screenfulls of
487 // output, and hopefully at most a few megabytes of memory for
488 // typical messages. If you change this, please update the test
489 // caching_large_output, too.
490 messages: Arc::new(Queue::new(100)),
491 diag_dedupe: DiagDedupe::new(build_runner.bcx.gctx),
492 warning_count: HashMap::new(),
493 active: HashMap::new(),
494 compiled: HashSet::new(),
495 documented: HashSet::new(),
496 scraped: HashSet::new(),
497 counts: self.counts,
498 progress,
499 next_id: 0,
500 timings: self.timings,
501 tokens: Vec::new(),
502 pending_queue: Vec::new(),
503 print: DiagnosticPrinter::new(
504 build_runner.bcx.gctx,
505 &build_runner.bcx.rustc().workspace_wrapper,
506 ),
507 finished: 0,
508 per_package_future_incompat_reports: Vec::new(),
509 };
510
511 // Create a helper thread for acquiring jobserver tokens
512 let messages = state.messages.clone();
513 let helper = build_runner
514 .jobserver
515 .clone()
516 .into_helper_thread(move |token| {
517 messages.push(Message::Token(token));
518 })
519 .context("failed to create helper thread for jobserver management")?;
520
521 // Create a helper thread to manage the diagnostics for rustfix if
522 // necessary.
523 let messages = state.messages.clone();
524 // It is important that this uses `push` instead of `push_bounded` for
525 // now. If someone wants to fix this to be bounded, the `drop`
526 // implementation needs to be changed to avoid possible deadlocks.
527 let _diagnostic_server = build_runner
528 .bcx
529 .build_config
530 .rustfix_diagnostic_server
531 .borrow_mut()
532 .take()
533 .map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
534
535 thread::scope(move |scope| {
536 match state.drain_the_queue(build_runner, plan, scope, &helper) {
537 Some(err) => Err(err),
538 None => Ok(()),
539 }
540 })
541 }
542}
543
544impl<'gctx> DrainState<'gctx> {
545 fn spawn_work_if_possible<'s>(
546 &mut self,
547 build_runner: &mut BuildRunner<'_, '_>,
548 jobserver_helper: &HelperThread,
549 scope: &'s Scope<'s, '_>,
550 ) -> CargoResult<()> {
551 // Dequeue as much work as we can, learning about everything
552 // possible that can run. Note that this is also the point where we
553 // start requesting job tokens. Each job after the first needs to
554 // request a token.
555 while let Some((unit, job, priority)) = self.queue.dequeue() {
556 // We want to keep the pieces of work in the `pending_queue` sorted
557 // by their priorities, and insert the current job at its correctly
558 // sorted position: following the lower priority jobs, and the ones
559 // with the same priority (since they were dequeued before the
560 // current one, we also keep that relation).
561 let idx = self
562 .pending_queue
563 .partition_point(|&(_, _, p)| p <= priority);
564 self.pending_queue.insert(idx, (unit, job, priority));
565 if self.active.len() + self.pending_queue.len() > 1 {
566 jobserver_helper.request_token();
567 }
568 }
569
570 // Now that we've learned of all possible work that we can execute
571 // try to spawn it so long as we've got a jobserver token which says
572 // we're able to perform some parallel work.
573 // The `pending_queue` is sorted in ascending priority order, and we
574 // remove items from its end to schedule the highest priority items
575 // sooner.
576 while self.has_extra_tokens() && !self.pending_queue.is_empty() {
577 let (unit, job, _) = self.pending_queue.pop().unwrap();
578 *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
579 if !build_runner.bcx.build_config.build_plan {
580 // Print out some nice progress information.
581 // NOTE: An error here will drop the job without starting it.
582 // That should be OK, since we want to exit as soon as
583 // possible during an error.
584 self.note_working_on(
585 build_runner.bcx.gctx,
586 build_runner.bcx.ws.root(),
587 &unit,
588 job.freshness(),
589 )?;
590 }
591 self.run(&unit, job, build_runner, scope);
592 }
593
594 Ok(())
595 }
596
597 fn has_extra_tokens(&self) -> bool {
598 self.active.len() < self.tokens.len() + 1
599 }
600
601 fn handle_event(
602 &mut self,
603 build_runner: &mut BuildRunner<'_, '_>,
604 plan: &mut BuildPlan,
605 event: Message,
606 ) -> Result<(), ErrorToHandle> {
607 let warning_handling = build_runner.bcx.gctx.warning_handling()?;
608 match event {
609 Message::Run(id, cmd) => {
610 build_runner
611 .bcx
612 .gctx
613 .shell()
614 .verbose(|c| c.status("Running", &cmd))?;
615 self.timings.unit_start(id, self.active[&id].clone());
616 }
617 Message::BuildPlanMsg(module_name, cmd, filenames) => {
618 plan.update(&module_name, &cmd, &filenames)?;
619 }
620 Message::Stdout(out) => {
621 writeln!(build_runner.bcx.gctx.shell().out(), "{}", out)?;
622 }
623 Message::Stderr(err) => {
624 let mut shell = build_runner.bcx.gctx.shell();
625 shell.print_ansi_stderr(err.as_bytes())?;
626 shell.err().write_all(b"\n")?;
627 }
628 Message::Diagnostic {
629 id,
630 level,
631 diag,
632 fixable,
633 } => {
634 let emitted = self.diag_dedupe.emit_diag(&diag)?;
635 if level == "warning" {
636 self.bump_warning_count(id, emitted, fixable);
637 }
638 if level == "error" {
639 let cnts = self.warning_count.entry(id).or_default();
640 // If there is an error, the `cargo fix` message should not show
641 cnts.disallow_fixable();
642 }
643 }
644 Message::Warning { id, warning } => {
645 if warning_handling != WarningHandling::Allow {
646 build_runner.bcx.gctx.shell().warn(warning)?;
647 }
648 self.bump_warning_count(id, true, false);
649 }
650 Message::WarningCount {
651 id,
652 emitted,
653 fixable,
654 } => {
655 self.bump_warning_count(id, emitted, fixable);
656 }
657 Message::FixDiagnostic(msg) => {
658 self.print.print(&msg)?;
659 }
660 Message::Finish(id, artifact, result) => {
661 let unit = match artifact {
662 // If `id` has completely finished we remove it
663 // from the `active` map ...
664 Artifact::All => {
665 trace!("end: {:?}", id);
666 self.finished += 1;
667 self.report_warning_count(
668 build_runner,
669 id,
670 &build_runner.bcx.rustc().workspace_wrapper,
671 );
672 self.active.remove(&id).unwrap()
673 }
674 // ... otherwise if it hasn't finished we leave it
675 // in there as we'll get another `Finish` later on.
676 Artifact::Metadata => {
677 trace!("end (meta): {:?}", id);
678 self.active[&id].clone()
679 }
680 };
681 debug!("end ({:?}): {:?}", unit, result);
682 match result {
683 Ok(()) => self.finish(id, &unit, artifact, build_runner)?,
684 Err(_) if build_runner.bcx.unit_can_fail_for_docscraping(&unit) => {
685 build_runner
686 .failed_scrape_units
687 .lock()
688 .unwrap()
689 .insert(build_runner.files().metadata(&unit).unit_id());
690 self.queue.finish(&unit, &artifact);
691 }
692 Err(error) => {
693 let show_warnings = true;
694 self.emit_log_messages(&unit, build_runner, show_warnings)?;
695 self.back_compat_notice(build_runner, &unit)?;
696 return Err(ErrorToHandle {
697 error,
698 print_always: true,
699 });
700 }
701 }
702 }
703 Message::FutureIncompatReport(id, items) => {
704 let unit = &self.active[&id];
705 let package_id = unit.pkg.package_id();
706 let is_local = unit.is_local();
707 self.per_package_future_incompat_reports
708 .push(FutureIncompatReportPackage {
709 package_id,
710 is_local,
711 items,
712 });
713 }
714 Message::Token(acquired_token) => {
715 let token = acquired_token.context("failed to acquire jobserver token")?;
716 self.tokens.push(token);
717 }
718 Message::SectionTiming(id, section) => {
719 self.timings.unit_section_timing(id, §ion);
720 }
721 }
722
723 Ok(())
724 }
725
726 // This will also tick the progress bar as appropriate
727 fn wait_for_events(&mut self) -> Vec<Message> {
728 // Drain all events at once to avoid displaying the progress bar
729 // unnecessarily. If there's no events we actually block waiting for
730 // an event, but we keep a "heartbeat" going to allow `record_cpu`
731 // to run above to calculate CPU usage over time. To do this we
732 // listen for a message with a timeout, and on timeout we run the
733 // previous parts of the loop again.
734 let mut events = self.messages.try_pop_all();
735 if events.is_empty() {
736 loop {
737 self.tick_progress();
738 self.tokens.truncate(self.active.len() - 1);
739 match self.messages.pop(Duration::from_millis(500)) {
740 Some(message) => {
741 events.push(message);
742 break;
743 }
744 None => continue,
745 }
746 }
747 }
748 events
749 }
750
751 /// This is the "main" loop, where Cargo does all work to run the
752 /// compiler.
753 ///
754 /// This returns an Option to prevent the use of `?` on `Result` types
755 /// because it is important for the loop to carefully handle errors.
756 fn drain_the_queue<'s>(
757 mut self,
758 build_runner: &mut BuildRunner<'_, '_>,
759 plan: &mut BuildPlan,
760 scope: &'s Scope<'s, '_>,
761 jobserver_helper: &HelperThread,
762 ) -> Option<anyhow::Error> {
763 trace!("queue: {:#?}", self.queue);
764
765 // Iteratively execute the entire dependency graph. Each turn of the
766 // loop starts out by scheduling as much work as possible (up to the
767 // maximum number of parallel jobs we have tokens for). A local queue
768 // is maintained separately from the main dependency queue as one
769 // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
770 // in one package).
771 //
772 // After a job has finished we update our internal state if it was
773 // successful and otherwise wait for pending work to finish if it failed
774 // and then immediately return (or keep going, if requested by the build
775 // config).
776 let mut errors = ErrorsDuringDrain { count: 0 };
777 // CAUTION! Do not use `?` or break out of the loop early. Every error
778 // must be handled in such a way that the loop is still allowed to
779 // drain event messages.
780 loop {
781 if errors.count == 0 || build_runner.bcx.build_config.keep_going {
782 if let Err(e) = self.spawn_work_if_possible(build_runner, jobserver_helper, scope) {
783 self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
784 }
785 }
786
787 // If after all that we're not actually running anything then we're
788 // done!
789 if self.active.is_empty() {
790 break;
791 }
792
793 // And finally, before we block waiting for the next event, drop any
794 // excess tokens we may have accidentally acquired. Due to how our
795 // jobserver interface is architected we may acquire a token that we
796 // don't actually use, and if this happens just relinquish it back
797 // to the jobserver itself.
798 for event in self.wait_for_events() {
799 if let Err(event_err) = self.handle_event(build_runner, plan, event) {
800 self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, event_err);
801 }
802 }
803 }
804 self.progress.clear();
805
806 let profile_name = build_runner.bcx.build_config.requested_profile;
807 // NOTE: this may be a bit inaccurate, since this may not display the
808 // profile for what was actually built. Profile overrides can change
809 // these settings, and in some cases different targets are built with
810 // different profiles. To be accurate, it would need to collect a
811 // list of Units built, and maybe display a list of the different
812 // profiles used. However, to keep it simple and compatible with old
813 // behavior, we just display what the base profile is.
814 let profile = build_runner.bcx.profiles.base_profile();
815 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
816 "unoptimized"
817 } else {
818 "optimized"
819 });
820 if profile.debuginfo.is_turned_on() {
821 opt_type += " + debuginfo";
822 }
823
824 let time_elapsed = util::elapsed(build_runner.bcx.gctx.creation_time().elapsed());
825 if let Err(e) = self.timings.finished(build_runner, &errors.to_error()) {
826 self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
827 }
828 if build_runner.bcx.build_config.emit_json() {
829 let mut shell = build_runner.bcx.gctx.shell();
830 let msg = machine_message::BuildFinished {
831 success: errors.count == 0,
832 }
833 .to_json_string();
834 if let Err(e) = writeln!(shell.out(), "{}", msg) {
835 self.handle_error(&mut shell, &mut errors, e);
836 }
837 }
838
839 if let Some(error) = errors.to_error() {
840 // Any errors up to this point have already been printed via the
841 // `display_error` inside `handle_error`.
842 Some(anyhow::Error::new(AlreadyPrintedError::new(error)))
843 } else if self.queue.is_empty() && self.pending_queue.is_empty() {
844 let profile_link = build_runner.bcx.gctx.shell().err_hyperlink(
845 "https://doc.rust-lang.org/cargo/reference/profiles.html#default-profiles",
846 );
847 let message = format!(
848 "{profile_link}`{profile_name}` profile [{opt_type}]{profile_link:#} target(s) in {time_elapsed}",
849 );
850 if !build_runner.bcx.build_config.build_plan {
851 // It doesn't really matter if this fails.
852 let _ = build_runner.bcx.gctx.shell().status("Finished", message);
853 future_incompat::save_and_display_report(
854 build_runner.bcx,
855 &self.per_package_future_incompat_reports,
856 );
857 }
858
859 None
860 } else {
861 debug!("queue: {:#?}", self.queue);
862 Some(internal("finished with jobs still left in the queue"))
863 }
864 }
865
866 fn handle_error(
867 &mut self,
868 shell: &mut Shell,
869 err_state: &mut ErrorsDuringDrain,
870 new_err: impl Into<ErrorToHandle>,
871 ) {
872 let new_err = new_err.into();
873 if new_err.print_always || err_state.count == 0 {
874 crate::display_error(&new_err.error, shell);
875 if err_state.count == 0 && !self.active.is_empty() {
876 self.progress.indicate_error();
877 let _ = shell.warn("build failed, waiting for other jobs to finish...");
878 }
879 err_state.count += 1;
880 } else {
881 tracing::warn!("{:?}", new_err.error);
882 }
883 }
884
885 // This also records CPU usage and marks concurrency; we roughly want to do
886 // this as often as we spin on the events receiver (at least every 500ms or
887 // so).
888 fn tick_progress(&mut self) {
889 // Record some timing information if `--timings` is enabled, and
890 // this'll end up being a noop if we're not recording this
891 // information.
892 self.timings.mark_concurrency(
893 self.active.len(),
894 self.pending_queue.len(),
895 self.queue.len(),
896 );
897 self.timings.record_cpu();
898
899 let active_names = self
900 .active
901 .values()
902 .map(|u| self.name_for_progress(u))
903 .collect::<Vec<_>>();
904 let _ = self.progress.tick_now(
905 self.finished,
906 self.total_units,
907 &format!(": {}", active_names.join(", ")),
908 );
909 }
910
911 fn name_for_progress(&self, unit: &Unit) -> String {
912 let pkg_name = unit.pkg.name();
913 let target_name = unit.target.name();
914 match unit.mode {
915 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
916 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
917 CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
918 TargetKind::Lib(_) => format!("{}(test)", target_name),
919 TargetKind::CustomBuild => panic!("cannot test build script"),
920 TargetKind::Bin => format!("{}(bin test)", target_name),
921 TargetKind::Test => format!("{}(test)", target_name),
922 TargetKind::Bench => format!("{}(bench)", target_name),
923 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
924 format!("{}(example test)", target_name)
925 }
926 },
927 _ => match unit.target.kind() {
928 TargetKind::Lib(_) => pkg_name.to_string(),
929 TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
930 TargetKind::Bin => format!("{}(bin)", target_name),
931 TargetKind::Test => format!("{}(test)", target_name),
932 TargetKind::Bench => format!("{}(bench)", target_name),
933 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
934 format!("{}(example)", target_name)
935 }
936 },
937 }
938 }
939
940 /// Executes a job.
941 ///
942 /// Fresh jobs block until finished (which should be very fast!), Dirty
943 /// jobs will spawn a thread in the background and return immediately.
944 fn run<'s>(
945 &mut self,
946 unit: &Unit,
947 job: Job,
948 build_runner: &BuildRunner<'_, '_>,
949 scope: &'s Scope<'s, '_>,
950 ) {
951 let id = JobId(self.next_id);
952 self.next_id = self.next_id.checked_add(1).unwrap();
953
954 debug!("start {}: {:?}", id, unit);
955
956 assert!(self.active.insert(id, unit.clone()).is_none());
957
958 let messages = self.messages.clone();
959 let is_fresh = job.freshness().is_fresh();
960 let rmeta_required = build_runner.rmeta_required(unit);
961
962 let doit = move |diag_dedupe| {
963 let state = JobState::new(id, messages, diag_dedupe, rmeta_required);
964 state.run_to_finish(job);
965 };
966
967 match is_fresh {
968 true => {
969 self.timings.add_fresh();
970 // Running a fresh job on the same thread is often much faster than spawning a new
971 // thread to run the job.
972 doit(Some(&self.diag_dedupe));
973 }
974 false => {
975 self.timings.add_dirty();
976 scope.spawn(move || doit(None));
977 }
978 }
979 }
980
981 fn emit_log_messages(
982 &self,
983 unit: &Unit,
984 build_runner: &mut BuildRunner<'_, '_>,
985 show_warnings: bool,
986 ) -> CargoResult<()> {
987 let outputs = build_runner.build_script_outputs.lock().unwrap();
988 let Some(metadata_vec) = build_runner.find_build_script_metadatas(unit) else {
989 return Ok(());
990 };
991 let bcx = &mut build_runner.bcx;
992 for metadata in metadata_vec {
993 if let Some(output) = outputs.get(metadata) {
994 if !output.log_messages.is_empty()
995 && (show_warnings
996 || output
997 .log_messages
998 .iter()
999 .any(|(severity, _)| *severity == Severity::Error))
1000 {
1001 let msg_with_package =
1002 |msg: &str| format!("{}@{}: {}", unit.pkg.name(), unit.pkg.version(), msg);
1003
1004 for (severity, message) in output.log_messages.iter() {
1005 match severity {
1006 Severity::Error => {
1007 bcx.gctx.shell().error(msg_with_package(message))?;
1008 }
1009 Severity::Warning => {
1010 bcx.gctx.shell().warn(msg_with_package(message))?;
1011 }
1012 }
1013 }
1014 }
1015 }
1016 }
1017
1018 Ok(())
1019 }
1020
1021 fn bump_warning_count(&mut self, id: JobId, emitted: bool, fixable: bool) {
1022 let cnts = self.warning_count.entry(id).or_default();
1023 cnts.total += 1;
1024 if !emitted {
1025 cnts.duplicates += 1;
1026 // Don't add to fixable if it's already been emitted
1027 } else if fixable {
1028 // Do not add anything to the fixable warning count if
1029 // is `NotAllowed` since that indicates there was an
1030 // error while building this `Unit`
1031 if cnts.fixable_allowed() {
1032 cnts.fixable = match cnts.fixable {
1033 FixableWarnings::NotAllowed => FixableWarnings::NotAllowed,
1034 FixableWarnings::Zero => FixableWarnings::Positive(1),
1035 FixableWarnings::Positive(fixable) => FixableWarnings::Positive(fixable + 1),
1036 };
1037 }
1038 }
1039 }
1040
1041 /// Displays a final report of the warnings emitted by a particular job.
1042 fn report_warning_count(
1043 &mut self,
1044 runner: &mut BuildRunner<'_, '_>,
1045 id: JobId,
1046 rustc_workspace_wrapper: &Option<PathBuf>,
1047 ) {
1048 let gctx = runner.bcx.gctx;
1049 let count = match self.warning_count.get(&id) {
1050 // An error could add an entry for a `Unit`
1051 // with 0 warnings but having fixable
1052 // warnings be disallowed
1053 Some(count) if count.total > 0 => count,
1054 None | Some(_) => return,
1055 };
1056 runner.compilation.warning_count += count.total;
1057 let unit = &self.active[&id];
1058 let mut message = descriptive_pkg_name(&unit.pkg.name(), &unit.target, &unit.mode);
1059 message.push_str(" generated ");
1060 match count.total {
1061 1 => message.push_str("1 warning"),
1062 n => {
1063 let _ = write!(message, "{} warnings", n);
1064 }
1065 };
1066 match count.duplicates {
1067 0 => {}
1068 1 => message.push_str(" (1 duplicate)"),
1069 n => {
1070 let _ = write!(message, " ({} duplicates)", n);
1071 }
1072 }
1073 // Only show the `cargo fix` message if its a local `Unit`
1074 if unit.is_local() {
1075 // Do not show this if there are any errors or no fixable warnings
1076 if let FixableWarnings::Positive(fixable) = count.fixable {
1077 // `cargo fix` doesn't have an option for custom builds
1078 if !unit.target.is_custom_build() {
1079 // To make sure the correct command is shown for `clippy` we
1080 // check if `RUSTC_WORKSPACE_WRAPPER` is set and pointing towards
1081 // `clippy-driver`.
1082 let clippy = std::ffi::OsStr::new("clippy-driver");
1083 let command = match rustc_workspace_wrapper.as_ref().and_then(|x| x.file_stem())
1084 {
1085 Some(wrapper) if wrapper == clippy => "cargo clippy --fix",
1086 _ => "cargo fix",
1087 };
1088 let mut args = {
1089 let named = unit.target.description_named();
1090 // if its a lib we need to add the package to fix
1091 if unit.target.is_lib() {
1092 format!("{} -p {}", named, unit.pkg.name())
1093 } else {
1094 named
1095 }
1096 };
1097 if unit.mode.is_rustc_test()
1098 && !(unit.target.is_test() || unit.target.is_bench())
1099 {
1100 args.push_str(" --tests");
1101 }
1102 let mut suggestions = format!("{} suggestion", fixable);
1103 if fixable > 1 {
1104 suggestions.push_str("s")
1105 }
1106 let _ = write!(
1107 message,
1108 " (run `{command} --{args}` to apply {suggestions})"
1109 );
1110 }
1111 }
1112 }
1113 // Errors are ignored here because it is tricky to handle them
1114 // correctly, and they aren't important.
1115 let _ = gctx.shell().warn(message);
1116 }
1117
1118 fn finish(
1119 &mut self,
1120 id: JobId,
1121 unit: &Unit,
1122 artifact: Artifact,
1123 build_runner: &mut BuildRunner<'_, '_>,
1124 ) -> CargoResult<()> {
1125 if unit.mode.is_run_custom_build() {
1126 self.emit_log_messages(
1127 unit,
1128 build_runner,
1129 unit.show_warnings(build_runner.bcx.gctx),
1130 )?;
1131 }
1132 let unlocked = self.queue.finish(unit, &artifact);
1133 match artifact {
1134 Artifact::All => self.timings.unit_finished(id, unlocked),
1135 Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
1136 }
1137 Ok(())
1138 }
1139
1140 // This isn't super trivial because we don't want to print loads and
1141 // loads of information to the console, but we also want to produce a
1142 // faithful representation of what's happening. This is somewhat nuanced
1143 // as a package can start compiling *very* early on because of custom
1144 // build commands and such.
1145 //
1146 // In general, we try to print "Compiling" for the first nontrivial task
1147 // run for a package, regardless of when that is. We then don't print
1148 // out any more information for a package after we've printed it once.
1149 fn note_working_on(
1150 &mut self,
1151 gctx: &GlobalContext,
1152 ws_root: &Path,
1153 unit: &Unit,
1154 fresh: &Freshness,
1155 ) -> CargoResult<()> {
1156 if (self.compiled.contains(&unit.pkg.package_id())
1157 && !unit.mode.is_doc()
1158 && !unit.mode.is_doc_scrape())
1159 || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
1160 || (self.scraped.contains(&unit.pkg.package_id()) && unit.mode.is_doc_scrape())
1161 {
1162 return Ok(());
1163 }
1164
1165 match fresh {
1166 // Any dirty stage which runs at least one command gets printed as
1167 // being a compiled package.
1168 Dirty(dirty_reason) => {
1169 if !dirty_reason.is_fresh_build() {
1170 gctx.shell()
1171 .verbose(|shell| dirty_reason.present_to(shell, unit, ws_root))?;
1172 }
1173
1174 if unit.mode.is_doc() {
1175 self.documented.insert(unit.pkg.package_id());
1176 gctx.shell().status("Documenting", &unit.pkg)?;
1177 } else if unit.mode.is_doc_test() {
1178 // Skip doc test.
1179 } else if unit.mode.is_doc_scrape() {
1180 self.scraped.insert(unit.pkg.package_id());
1181 gctx.shell().status("Scraping", &unit.pkg)?;
1182 } else {
1183 self.compiled.insert(unit.pkg.package_id());
1184 if unit.mode.is_check() {
1185 gctx.shell().status("Checking", &unit.pkg)?;
1186 } else {
1187 gctx.shell().status("Compiling", &unit.pkg)?;
1188 }
1189 }
1190 }
1191 Fresh => {
1192 // If doc test are last, only print "Fresh" if nothing has been printed.
1193 if self.counts[&unit.pkg.package_id()] == 0
1194 && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
1195 {
1196 self.compiled.insert(unit.pkg.package_id());
1197 gctx.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
1198 }
1199 }
1200 }
1201 Ok(())
1202 }
1203
1204 fn back_compat_notice(
1205 &self,
1206 build_runner: &BuildRunner<'_, '_>,
1207 unit: &Unit,
1208 ) -> CargoResult<()> {
1209 if unit.pkg.name() != "diesel"
1210 || unit.pkg.version() >= &Version::new(1, 4, 8)
1211 || build_runner.bcx.ws.resolve_behavior() == ResolveBehavior::V1
1212 || !unit.pkg.package_id().source_id().is_registry()
1213 || !unit.features.is_empty()
1214 {
1215 return Ok(());
1216 }
1217 if !build_runner
1218 .bcx
1219 .unit_graph
1220 .keys()
1221 .any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
1222 {
1223 return Ok(());
1224 }
1225 build_runner.bcx.gctx.shell().note(
1226 "\
1227This error may be due to an interaction between diesel and Cargo's new
1228feature resolver. Try updating to diesel 1.4.8 to fix this error.
1229",
1230 )?;
1231 Ok(())
1232 }
1233}
1234
1235impl ErrorsDuringDrain {
1236 fn to_error(&self) -> Option<anyhow::Error> {
1237 match self.count {
1238 0 => None,
1239 1 => Some(format_err!("1 job failed")),
1240 n => Some(format_err!("{} jobs failed", n)),
1241 }
1242 }
1243}