cargo/core/
package.rs

1use std::cell::{Cell, Ref, RefCell, RefMut};
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
4use std::fmt;
5use std::hash;
6use std::mem;
7use std::path::{Path, PathBuf};
8use std::rc::Rc;
9use std::time::{Duration, Instant};
10
11use anyhow::Context as _;
12use bytesize::ByteSize;
13use cargo_util_schemas::manifest::RustVersion;
14use curl::easy::Easy;
15use curl::multi::{EasyHandle, Multi};
16use lazycell::LazyCell;
17use semver::Version;
18use serde::Serialize;
19use tracing::debug;
20
21use crate::core::compiler::{CompileKind, RustcTargetData};
22use crate::core::dependency::DepKind;
23use crate::core::resolver::features::ForceAllTargets;
24use crate::core::resolver::{HasDevUnits, Resolve};
25use crate::core::{
26    CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
27    SourceId, Target,
28};
29use crate::core::{Summary, Workspace};
30use crate::sources::source::{MaybePackage, SourceMap};
31use crate::util::cache_lock::{CacheLock, CacheLockMode};
32use crate::util::errors::{CargoResult, HttpNotSuccessful};
33use crate::util::interning::InternedString;
34use crate::util::network::http::http_handle_and_timeout;
35use crate::util::network::http::HttpTimeout;
36use crate::util::network::retry::{Retry, RetryResult};
37use crate::util::network::sleep::SleepTracker;
38use crate::util::{self, internal, GlobalContext, Progress, ProgressStyle};
39
40/// Information about a package that is available somewhere in the file system.
41///
42/// A package is a `Cargo.toml` file plus all the files that are part of it.
43#[derive(Clone)]
44pub struct Package {
45    inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49// TODO: is `manifest_path` a relic?
50struct PackageInner {
51    /// The package's manifest.
52    manifest: Manifest,
53    /// The root of the package.
54    manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58    fn cmp(&self, other: &Package) -> Ordering {
59        self.package_id().cmp(&other.package_id())
60    }
61}
62
63impl PartialOrd for Package {
64    fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65        Some(self.cmp(other))
66    }
67}
68
69/// A Package in a form where `Serialize` can be derived.
70#[derive(Serialize)]
71pub struct SerializedPackage {
72    name: InternedString,
73    version: Version,
74    id: PackageIdSpec,
75    license: Option<String>,
76    license_file: Option<String>,
77    description: Option<String>,
78    source: SourceId,
79    dependencies: Vec<SerializedDependency>,
80    targets: Vec<Target>,
81    features: BTreeMap<InternedString, Vec<InternedString>>,
82    manifest_path: PathBuf,
83    metadata: Option<toml::Value>,
84    publish: Option<Vec<String>>,
85    authors: Vec<String>,
86    categories: Vec<String>,
87    keywords: Vec<String>,
88    readme: Option<String>,
89    repository: Option<String>,
90    homepage: Option<String>,
91    documentation: Option<String>,
92    edition: String,
93    links: Option<String>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    metabuild: Option<Vec<String>>,
96    default_run: Option<String>,
97    rust_version: Option<RustVersion>,
98}
99
100impl Package {
101    /// Creates a package from a manifest and its location.
102    pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
103        Package {
104            inner: Rc::new(PackageInner {
105                manifest,
106                manifest_path: manifest_path.to_path_buf(),
107            }),
108        }
109    }
110
111    /// Gets the manifest dependencies.
112    pub fn dependencies(&self) -> &[Dependency] {
113        self.manifest().dependencies()
114    }
115    /// Gets the manifest.
116    pub fn manifest(&self) -> &Manifest {
117        &self.inner.manifest
118    }
119    /// Gets the manifest.
120    pub fn manifest_mut(&mut self) -> &mut Manifest {
121        &mut Rc::make_mut(&mut self.inner).manifest
122    }
123    /// Gets the path to the manifest.
124    pub fn manifest_path(&self) -> &Path {
125        &self.inner.manifest_path
126    }
127    /// Gets the name of the package.
128    pub fn name(&self) -> InternedString {
129        self.package_id().name()
130    }
131    /// Gets the `PackageId` object for the package (fully defines a package).
132    pub fn package_id(&self) -> PackageId {
133        self.manifest().package_id()
134    }
135    /// Gets the root folder of the package.
136    pub fn root(&self) -> &Path {
137        self.manifest_path().parent().unwrap()
138    }
139    /// Gets the summary for the package.
140    pub fn summary(&self) -> &Summary {
141        self.manifest().summary()
142    }
143    /// Gets the targets specified in the manifest.
144    pub fn targets(&self) -> &[Target] {
145        self.manifest().targets()
146    }
147    /// Gets the library crate for this package, if it exists.
148    pub fn library(&self) -> Option<&Target> {
149        self.targets().iter().find(|t| t.is_lib())
150    }
151    /// Gets the current package version.
152    pub fn version(&self) -> &Version {
153        self.package_id().version()
154    }
155    /// Gets the package authors.
156    pub fn authors(&self) -> &Vec<String> {
157        &self.manifest().metadata().authors
158    }
159
160    /// Returns `None` if the package is set to publish.
161    /// Returns `Some(allowed_registries)` if publishing is limited to specified
162    /// registries or if package is set to not publish.
163    pub fn publish(&self) -> &Option<Vec<String>> {
164        self.manifest().publish()
165    }
166    /// Returns `true` if this package is a proc-macro.
167    pub fn proc_macro(&self) -> bool {
168        self.targets().iter().any(|target| target.proc_macro())
169    }
170    /// Gets the package's minimum Rust version.
171    pub fn rust_version(&self) -> Option<&RustVersion> {
172        self.manifest().rust_version()
173    }
174
175    /// Returns `true` if the package uses a custom build script for any target.
176    pub fn has_custom_build(&self) -> bool {
177        self.targets().iter().any(|t| t.is_custom_build())
178    }
179
180    pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
181        Package {
182            inner: Rc::new(PackageInner {
183                manifest: self.manifest().clone().map_source(to_replace, replace_with),
184                manifest_path: self.manifest_path().to_owned(),
185            }),
186        }
187    }
188
189    pub fn serialized(
190        &self,
191        unstable_flags: &CliUnstable,
192        cargo_features: &Features,
193    ) -> SerializedPackage {
194        let summary = self.manifest().summary();
195        let package_id = summary.package_id();
196        let manmeta = self.manifest().metadata();
197        // Filter out metabuild targets. They are an internal implementation
198        // detail that is probably not relevant externally. There's also not a
199        // real path to show in `src_path`, and this avoids changing the format.
200        let targets: Vec<Target> = self
201            .manifest()
202            .targets()
203            .iter()
204            .filter(|t| t.src_path().is_path())
205            .cloned()
206            .collect();
207        // Convert Vec<FeatureValue> to Vec<InternedString>
208        let crate_features = summary
209            .features()
210            .iter()
211            .map(|(k, v)| {
212                (
213                    *k,
214                    v.iter()
215                        .map(|fv| InternedString::new(&fv.to_string()))
216                        .collect(),
217                )
218            })
219            .collect();
220
221        SerializedPackage {
222            name: package_id.name(),
223            version: package_id.version().clone(),
224            id: package_id.to_spec(),
225            license: manmeta.license.clone(),
226            license_file: manmeta.license_file.clone(),
227            description: manmeta.description.clone(),
228            source: summary.source_id(),
229            dependencies: summary
230                .dependencies()
231                .iter()
232                .map(|dep| dep.serialized(unstable_flags, cargo_features))
233                .collect(),
234            targets,
235            features: crate_features,
236            manifest_path: self.manifest_path().to_path_buf(),
237            metadata: self.manifest().custom_metadata().cloned(),
238            authors: manmeta.authors.clone(),
239            categories: manmeta.categories.clone(),
240            keywords: manmeta.keywords.clone(),
241            readme: manmeta.readme.clone(),
242            repository: manmeta.repository.clone(),
243            homepage: manmeta.homepage.clone(),
244            documentation: manmeta.documentation.clone(),
245            edition: self.manifest().edition().to_string(),
246            links: self.manifest().links().map(|s| s.to_owned()),
247            metabuild: self.manifest().metabuild().cloned(),
248            publish: self.publish().as_ref().cloned(),
249            default_run: self.manifest().default_run().map(|s| s.to_owned()),
250            rust_version: self.rust_version().cloned(),
251        }
252    }
253}
254
255impl fmt::Display for Package {
256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257        write!(f, "{}", self.summary().package_id())
258    }
259}
260
261impl fmt::Debug for Package {
262    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263        f.debug_struct("Package")
264            .field("id", &self.summary().package_id())
265            .field("..", &"..")
266            .finish()
267    }
268}
269
270impl PartialEq for Package {
271    fn eq(&self, other: &Package) -> bool {
272        self.package_id() == other.package_id()
273    }
274}
275
276impl Eq for Package {}
277
278impl hash::Hash for Package {
279    fn hash<H: hash::Hasher>(&self, into: &mut H) {
280        self.package_id().hash(into)
281    }
282}
283
284/// A set of packages, with the intent to download.
285///
286/// This is primarily used to convert a set of `PackageId`s to `Package`s. It
287/// will download as needed, or used the cached download if available.
288pub struct PackageSet<'gctx> {
289    packages: HashMap<PackageId, LazyCell<Package>>,
290    sources: RefCell<SourceMap<'gctx>>,
291    gctx: &'gctx GlobalContext,
292    multi: Multi,
293    /// Used to prevent reusing the `PackageSet` to download twice.
294    downloading: Cell<bool>,
295    /// Whether or not to use curl HTTP/2 multiplexing.
296    multiplexing: bool,
297}
298
299/// Helper for downloading crates.
300pub struct Downloads<'a, 'gctx> {
301    set: &'a PackageSet<'gctx>,
302    /// When a download is started, it is added to this map. The key is a
303    /// "token" (see `Download::token`). It is removed once the download is
304    /// finished.
305    pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
306    /// Set of packages currently being downloaded. This should stay in sync
307    /// with `pending`.
308    pending_ids: HashSet<PackageId>,
309    /// Downloads that have failed and are waiting to retry again later.
310    sleeping: SleepTracker<(Download<'gctx>, Easy)>,
311    /// The final result of each download. A pair `(token, result)`. This is a
312    /// temporary holding area, needed because curl can report multiple
313    /// downloads at once, but the main loop (`wait`) is written to only
314    /// handle one at a time.
315    results: Vec<(usize, Result<(), curl::Error>)>,
316    /// The next ID to use for creating a token (see `Download::token`).
317    next: usize,
318    /// Progress bar.
319    progress: RefCell<Option<Progress<'gctx>>>,
320    /// Number of downloads that have successfully finished.
321    downloads_finished: usize,
322    /// Total bytes for all successfully downloaded packages.
323    downloaded_bytes: u64,
324    /// Size (in bytes) and package name of the largest downloaded package.
325    largest: (u64, InternedString),
326    /// Time when downloading started.
327    start: Instant,
328    /// Indicates *all* downloads were successful.
329    success: bool,
330
331    /// Timeout management, both of timeout thresholds as well as whether or not
332    /// our connection has timed out (and accompanying message if it has).
333    ///
334    /// Note that timeout management is done manually here instead of in libcurl
335    /// because we want to apply timeouts to an entire batch of operations, not
336    /// any one particular single operation.
337    timeout: HttpTimeout,
338    /// Last time bytes were received.
339    updated_at: Cell<Instant>,
340    /// This is a slow-speed check. It is reset to `now + timeout_duration`
341    /// every time at least `threshold` bytes are received. If the current
342    /// time ever exceeds `next_speed_check`, then give up and report a
343    /// timeout error.
344    next_speed_check: Cell<Instant>,
345    /// This is the slow-speed threshold byte count. It starts at the
346    /// configured threshold value (default 10), and is decremented by the
347    /// number of bytes received in each chunk. If it is <= zero, the
348    /// threshold has been met and data is being received fast enough not to
349    /// trigger a timeout; reset `next_speed_check` and set this back to the
350    /// configured threshold.
351    next_speed_check_bytes_threshold: Cell<u64>,
352    /// Global filesystem lock to ensure only one Cargo is downloading at a
353    /// time.
354    _lock: CacheLock<'gctx>,
355}
356
357struct Download<'gctx> {
358    /// The token for this download, used as the key of the `Downloads::pending` map
359    /// and stored in `EasyHandle` as well.
360    token: usize,
361
362    /// The package that we're downloading.
363    id: PackageId,
364
365    /// Actual downloaded data, updated throughout the lifetime of this download.
366    data: RefCell<Vec<u8>>,
367
368    /// HTTP headers for debugging.
369    headers: RefCell<Vec<String>>,
370
371    /// The URL that we're downloading from, cached here for error messages and
372    /// reenqueuing.
373    url: String,
374
375    /// A descriptive string to print when we've finished downloading this crate.
376    descriptor: String,
377
378    /// Statistics updated from the progress callback in libcurl.
379    total: Cell<u64>,
380    current: Cell<u64>,
381
382    /// The moment we started this transfer at.
383    start: Instant,
384    timed_out: Cell<Option<String>>,
385
386    /// Logic used to track retrying this download if it's a spurious failure.
387    retry: Retry<'gctx>,
388}
389
390impl<'gctx> PackageSet<'gctx> {
391    pub fn new(
392        package_ids: &[PackageId],
393        sources: SourceMap<'gctx>,
394        gctx: &'gctx GlobalContext,
395    ) -> CargoResult<PackageSet<'gctx>> {
396        // We've enabled the `http2` feature of `curl` in Cargo, so treat
397        // failures here as fatal as it would indicate a build-time problem.
398        let mut multi = Multi::new();
399        let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
400        multi
401            .pipelining(false, multiplexing)
402            .context("failed to enable multiplexing/pipelining in curl")?;
403
404        // let's not flood crates.io with connections
405        multi.set_max_host_connections(2)?;
406
407        Ok(PackageSet {
408            packages: package_ids
409                .iter()
410                .map(|&id| (id, LazyCell::new()))
411                .collect(),
412            sources: RefCell::new(sources),
413            gctx,
414            multi,
415            downloading: Cell::new(false),
416            multiplexing,
417        })
418    }
419
420    pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
421        self.packages.keys().cloned()
422    }
423
424    pub fn packages(&self) -> impl Iterator<Item = &Package> {
425        self.packages.values().filter_map(|p| p.borrow())
426    }
427
428    pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
429        assert!(!self.downloading.replace(true));
430        let timeout = HttpTimeout::new(self.gctx)?;
431        Ok(Downloads {
432            start: Instant::now(),
433            set: self,
434            next: 0,
435            pending: HashMap::new(),
436            pending_ids: HashSet::new(),
437            sleeping: SleepTracker::new(),
438            results: Vec::new(),
439            progress: RefCell::new(Some(Progress::with_style(
440                "Downloading",
441                ProgressStyle::Ratio,
442                self.gctx,
443            ))),
444            downloads_finished: 0,
445            downloaded_bytes: 0,
446            largest: (0, InternedString::new("")),
447            success: false,
448            updated_at: Cell::new(Instant::now()),
449            timeout,
450            next_speed_check: Cell::new(Instant::now()),
451            next_speed_check_bytes_threshold: Cell::new(0),
452            _lock: self
453                .gctx
454                .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
455        })
456    }
457
458    pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
459        if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) {
460            return Ok(pkg);
461        }
462        Ok(self.get_many(Some(id))?.remove(0))
463    }
464
465    pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
466        let mut pkgs = Vec::new();
467        let _lock = self
468            .gctx
469            .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
470        let mut downloads = self.enable_download()?;
471        for id in ids {
472            pkgs.extend(downloads.start(id)?);
473        }
474        while downloads.remaining() > 0 {
475            pkgs.push(downloads.wait()?);
476        }
477        downloads.success = true;
478        drop(downloads);
479
480        let mut deferred = self.gctx.deferred_global_last_use()?;
481        deferred.save_no_error(self.gctx);
482        Ok(pkgs)
483    }
484
485    /// Downloads any packages accessible from the give root ids.
486    #[tracing::instrument(skip_all)]
487    pub fn download_accessible(
488        &self,
489        resolve: &Resolve,
490        root_ids: &[PackageId],
491        has_dev_units: HasDevUnits,
492        requested_kinds: &[CompileKind],
493        target_data: &RustcTargetData<'gctx>,
494        force_all_targets: ForceAllTargets,
495    ) -> CargoResult<()> {
496        fn collect_used_deps(
497            used: &mut BTreeSet<(PackageId, CompileKind)>,
498            resolve: &Resolve,
499            pkg_id: PackageId,
500            has_dev_units: HasDevUnits,
501            requested_kind: CompileKind,
502            target_data: &RustcTargetData<'_>,
503            force_all_targets: ForceAllTargets,
504        ) -> CargoResult<()> {
505            if !used.insert((pkg_id, requested_kind)) {
506                return Ok(());
507            }
508            let requested_kinds = &[requested_kind];
509            let filtered_deps = PackageSet::filter_deps(
510                pkg_id,
511                resolve,
512                has_dev_units,
513                requested_kinds,
514                target_data,
515                force_all_targets,
516            );
517            for (pkg_id, deps) in filtered_deps {
518                collect_used_deps(
519                    used,
520                    resolve,
521                    pkg_id,
522                    has_dev_units,
523                    requested_kind,
524                    target_data,
525                    force_all_targets,
526                )?;
527                let artifact_kinds = deps.iter().filter_map(|dep| {
528                    Some(
529                        dep.artifact()?
530                            .target()?
531                            .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
532                    )
533                });
534                for artifact_kind in artifact_kinds {
535                    collect_used_deps(
536                        used,
537                        resolve,
538                        pkg_id,
539                        has_dev_units,
540                        artifact_kind,
541                        target_data,
542                        force_all_targets,
543                    )?;
544                }
545            }
546            Ok(())
547        }
548
549        // This is sorted by PackageId to get consistent behavior and error
550        // messages for Cargo's testsuite. Perhaps there is a better ordering
551        // that optimizes download time?
552        let mut to_download = BTreeSet::new();
553
554        for id in root_ids {
555            for requested_kind in requested_kinds {
556                collect_used_deps(
557                    &mut to_download,
558                    resolve,
559                    *id,
560                    has_dev_units,
561                    *requested_kind,
562                    target_data,
563                    force_all_targets,
564                )?;
565            }
566        }
567        let to_download = to_download
568            .into_iter()
569            .map(|(p, _)| p)
570            .collect::<BTreeSet<_>>();
571        self.get_many(to_download.into_iter())?;
572        Ok(())
573    }
574
575    /// Check if there are any dependency packages that violate artifact constraints
576    /// to instantly abort, or that do not have any libs which results in warnings.
577    pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
578        &self,
579        ws: &Workspace<'gctx>,
580        resolve: &Resolve,
581        root_ids: &[PackageId],
582        has_dev_units: HasDevUnits,
583        requested_kinds: &[CompileKind],
584        target_data: &RustcTargetData<'_>,
585        force_all_targets: ForceAllTargets,
586    ) -> CargoResult<()> {
587        let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
588            .iter()
589            .map(|&root_id| {
590                let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
591                    root_id,
592                    resolve,
593                    has_dev_units,
594                    requested_kinds,
595                    target_data,
596                    force_all_targets,
597                )
598                .collect();
599
600                let dep_pkgs_and_deps = dep_pkgs_to_deps
601                    .into_iter()
602                    .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
603                    .filter_map(|(dep_package_id, deps)| {
604                        self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
605                            (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
606                        })
607                    })
608                    .collect();
609                (root_id, dep_pkgs_and_deps)
610            })
611            .collect();
612
613        for (pkg_id, dep_pkgs) in no_lib_pkgs {
614            for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
615                for dep in deps.iter().filter(|dep| {
616                    dep.artifact()
617                        .map(|artifact| artifact.is_lib())
618                        .unwrap_or(true)
619                }) {
620                    ws.gctx().shell().warn(&format!(
621                        "{} ignoring invalid dependency `{}` which is missing a lib target",
622                        pkg_id,
623                        dep.name_in_toml(),
624                    ))?;
625                }
626            }
627        }
628        Ok(())
629    }
630
631    fn filter_deps<'a>(
632        pkg_id: PackageId,
633        resolve: &'a Resolve,
634        has_dev_units: HasDevUnits,
635        requested_kinds: &'a [CompileKind],
636        target_data: &'a RustcTargetData<'_>,
637        force_all_targets: ForceAllTargets,
638    ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
639        resolve
640            .deps(pkg_id)
641            .filter(move |&(_id, deps)| {
642                deps.iter().any(|dep| {
643                    if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
644                        return false;
645                    }
646                    if force_all_targets == ForceAllTargets::No {
647                        let activated = requested_kinds
648                            .iter()
649                            .chain(Some(&CompileKind::Host))
650                            .any(|kind| target_data.dep_platform_activated(dep, *kind));
651                        if !activated {
652                            return false;
653                        }
654                    }
655                    true
656                })
657            })
658            .into_iter()
659    }
660
661    pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
662        self.sources.borrow()
663    }
664
665    pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'gctx>> {
666        self.sources.borrow_mut()
667    }
668
669    /// Merge the given set into self.
670    pub fn add_set(&mut self, set: PackageSet<'gctx>) {
671        assert!(!self.downloading.get());
672        assert!(!set.downloading.get());
673        for (pkg_id, p_cell) in set.packages {
674            self.packages.entry(pkg_id).or_insert(p_cell);
675        }
676        let mut sources = self.sources.borrow_mut();
677        let other_sources = set.sources.into_inner();
678        sources.add_source_map(other_sources);
679    }
680}
681
682impl<'a, 'gctx> Downloads<'a, 'gctx> {
683    /// Starts to download the package for the `id` specified.
684    ///
685    /// Returns `None` if the package is queued up for download and will
686    /// eventually be returned from `wait_for_download`. Returns `Some(pkg)` if
687    /// the package is ready and doesn't need to be downloaded.
688    #[tracing::instrument(skip_all)]
689    pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
690        self.start_inner(id)
691            .with_context(|| format!("failed to download `{}`", id))
692    }
693
694    fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
695        // First up see if we've already cached this package, in which case
696        // there's nothing to do.
697        let slot = self
698            .set
699            .packages
700            .get(&id)
701            .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
702        if let Some(pkg) = slot.borrow() {
703            return Ok(Some(pkg));
704        }
705
706        // Ask the original source for this `PackageId` for the corresponding
707        // package. That may immediately come back and tell us that the package
708        // is ready, or it could tell us that it needs to be downloaded.
709        let mut sources = self.set.sources.borrow_mut();
710        let source = sources
711            .get_mut(id.source_id())
712            .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
713        let pkg = source
714            .download(id)
715            .context("unable to get packages from source")?;
716        let (url, descriptor, authorization) = match pkg {
717            MaybePackage::Ready(pkg) => {
718                debug!("{} doesn't need a download", id);
719                assert!(slot.fill(pkg).is_ok());
720                return Ok(Some(slot.borrow().unwrap()));
721            }
722            MaybePackage::Download {
723                url,
724                descriptor,
725                authorization,
726            } => (url, descriptor, authorization),
727        };
728
729        // Ok we're going to download this crate, so let's set up all our
730        // internal state and hand off an `Easy` handle to our libcurl `Multi`
731        // handle. This won't actually start the transfer, but later it'll
732        // happen during `wait_for_download`
733        let token = self.next;
734        self.next += 1;
735        debug!(target: "network", "downloading {} as {}", id, token);
736        assert!(self.pending_ids.insert(id));
737
738        let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
739        handle.get(true)?;
740        handle.url(&url)?;
741        handle.follow_location(true)?; // follow redirects
742
743        // Add authorization header.
744        if let Some(authorization) = authorization {
745            let mut headers = curl::easy::List::new();
746            headers.append(&format!("Authorization: {}", authorization))?;
747            handle.http_headers(headers)?;
748        }
749
750        // Enable HTTP/2 if possible.
751        crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
752
753        handle.write_function(move |buf| {
754            debug!(target: "network", "{} - {} bytes of data", token, buf.len());
755            tls::with(|downloads| {
756                if let Some(downloads) = downloads {
757                    downloads.pending[&token]
758                        .0
759                        .data
760                        .borrow_mut()
761                        .extend_from_slice(buf);
762                }
763            });
764            Ok(buf.len())
765        })?;
766        handle.header_function(move |data| {
767            tls::with(|downloads| {
768                if let Some(downloads) = downloads {
769                    // Headers contain trailing \r\n, trim them to make it easier
770                    // to work with.
771                    let h = String::from_utf8_lossy(data).trim().to_string();
772                    downloads.pending[&token].0.headers.borrow_mut().push(h);
773                }
774            });
775            true
776        })?;
777
778        handle.progress(true)?;
779        handle.progress_function(move |dl_total, dl_cur, _, _| {
780            tls::with(|downloads| match downloads {
781                Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
782                None => false,
783            })
784        })?;
785
786        // If the progress bar isn't enabled then it may be awhile before the
787        // first crate finishes downloading so we inform immediately that we're
788        // downloading crates here.
789        if self.downloads_finished == 0
790            && self.pending.is_empty()
791            && !self.progress.borrow().as_ref().unwrap().is_enabled()
792        {
793            self.set.gctx.shell().status("Downloading", "crates ...")?;
794        }
795
796        let dl = Download {
797            token,
798            data: RefCell::new(Vec::new()),
799            headers: RefCell::new(Vec::new()),
800            id,
801            url,
802            descriptor,
803            total: Cell::new(0),
804            current: Cell::new(0),
805            start: Instant::now(),
806            timed_out: Cell::new(None),
807            retry: Retry::new(self.set.gctx)?,
808        };
809        self.enqueue(dl, handle)?;
810        self.tick(WhyTick::DownloadStarted)?;
811
812        Ok(None)
813    }
814
815    /// Returns the number of crates that are still downloading.
816    pub fn remaining(&self) -> usize {
817        self.pending.len() + self.sleeping.len()
818    }
819
820    /// Blocks the current thread waiting for a package to finish downloading.
821    ///
822    /// This method will wait for a previously enqueued package to finish
823    /// downloading and return a reference to it after it's done downloading.
824    ///
825    /// # Panics
826    ///
827    /// This function will panic if there are no remaining downloads.
828    #[tracing::instrument(skip_all)]
829    pub fn wait(&mut self) -> CargoResult<&'a Package> {
830        let (dl, data) = loop {
831            assert_eq!(self.pending.len(), self.pending_ids.len());
832            let (token, result) = self.wait_for_curl()?;
833            debug!(target: "network", "{} finished with {:?}", token, result);
834
835            let (mut dl, handle) = self
836                .pending
837                .remove(&token)
838                .expect("got a token for a non-in-progress transfer");
839            let data = mem::take(&mut *dl.data.borrow_mut());
840            let headers = mem::take(&mut *dl.headers.borrow_mut());
841            let mut handle = self.set.multi.remove(handle)?;
842            self.pending_ids.remove(&dl.id);
843
844            // Check if this was a spurious error. If it was a spurious error
845            // then we want to re-enqueue our request for another attempt and
846            // then we wait for another request to finish.
847            let ret = {
848                let timed_out = &dl.timed_out;
849                let url = &dl.url;
850                dl.retry.r#try(|| {
851                    if let Err(e) = result {
852                        // If this error is "aborted by callback" then that's
853                        // probably because our progress callback aborted due to
854                        // a timeout. We'll find out by looking at the
855                        // `timed_out` field, looking for a descriptive message.
856                        // If one is found we switch the error code (to ensure
857                        // it's flagged as spurious) and then attach our extra
858                        // information to the error.
859                        if !e.is_aborted_by_callback() {
860                            return Err(e.into());
861                        }
862
863                        return Err(match timed_out.replace(None) {
864                            Some(msg) => {
865                                let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
866                                let mut err = curl::Error::new(code);
867                                err.set_extra(msg);
868                                err
869                            }
870                            None => e,
871                        }
872                        .into());
873                    }
874
875                    let code = handle.response_code()?;
876                    if code != 200 && code != 0 {
877                        return Err(HttpNotSuccessful::new_from_handle(
878                            &mut handle,
879                            &url,
880                            data,
881                            headers,
882                        )
883                        .into());
884                    }
885                    Ok(data)
886                })
887            };
888            match ret {
889                RetryResult::Success(data) => break (dl, data),
890                RetryResult::Err(e) => {
891                    return Err(e.context(format!("failed to download from `{}`", dl.url)))
892                }
893                RetryResult::Retry(sleep) => {
894                    debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
895                    self.sleeping.push(sleep, (dl, handle));
896                }
897            }
898        };
899
900        // If the progress bar isn't enabled then we still want to provide some
901        // semblance of progress of how we're downloading crates, and if the
902        // progress bar is enabled this provides a good log of what's happening.
903        self.progress.borrow_mut().as_mut().unwrap().clear();
904        self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
905
906        self.downloads_finished += 1;
907        self.downloaded_bytes += dl.total.get();
908        if dl.total.get() > self.largest.0 {
909            self.largest = (dl.total.get(), dl.id.name());
910        }
911
912        // We're about to synchronously extract the crate below. While we're
913        // doing that our download progress won't actually be updated, nor do we
914        // have a great view into the progress of the extraction. Let's prepare
915        // the user for this CPU-heavy step if it looks like it'll take some
916        // time to do so.
917        if dl.total.get() < ByteSize::kb(400).0 {
918            self.tick(WhyTick::DownloadFinished)?;
919        } else {
920            self.tick(WhyTick::Extracting(&dl.id.name()))?;
921        }
922
923        // Inform the original source that the download is finished which
924        // should allow us to actually get the package and fill it in now.
925        let mut sources = self.set.sources.borrow_mut();
926        let source = sources
927            .get_mut(dl.id.source_id())
928            .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
929        let start = Instant::now();
930        let pkg = source.finish_download(dl.id, data)?;
931
932        // Assume that no time has passed while we were calling
933        // `finish_download`, update all speed checks and timeout limits of all
934        // active downloads to make sure they don't fire because of a slowly
935        // extracted tarball.
936        let finish_dur = start.elapsed();
937        self.updated_at.set(self.updated_at.get() + finish_dur);
938        self.next_speed_check
939            .set(self.next_speed_check.get() + finish_dur);
940
941        let slot = &self.set.packages[&dl.id];
942        assert!(slot.fill(pkg).is_ok());
943        Ok(slot.borrow().unwrap())
944    }
945
946    fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
947        let mut handle = self.set.multi.add(handle)?;
948        let now = Instant::now();
949        handle.set_token(dl.token)?;
950        self.updated_at.set(now);
951        self.next_speed_check.set(now + self.timeout.dur);
952        self.next_speed_check_bytes_threshold
953            .set(u64::from(self.timeout.low_speed_limit));
954        dl.timed_out.set(None);
955        dl.current.set(0);
956        dl.total.set(0);
957        self.pending.insert(dl.token, (dl, handle));
958        Ok(())
959    }
960
961    /// Block, waiting for curl. Returns a token and a `Result` for that token
962    /// (`Ok` means the download successfully finished).
963    fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
964        // This is the main workhorse loop. We use libcurl's portable `wait`
965        // method to actually perform blocking. This isn't necessarily too
966        // efficient in terms of fd management, but we should only be juggling
967        // a few anyway.
968        //
969        // Here we start off by asking the `multi` handle to do some work via
970        // the `perform` method. This will actually do I/O work (non-blocking)
971        // and attempt to make progress. Afterwards we ask about the `messages`
972        // contained in the handle which will inform us if anything has finished
973        // transferring.
974        //
975        // If we've got a finished transfer after all that work we break out
976        // and process the finished transfer at the end. Otherwise we need to
977        // actually block waiting for I/O to happen, which we achieve with the
978        // `wait` method on `multi`.
979        loop {
980            self.add_sleepers()?;
981            let n = tls::set(self, || {
982                self.set
983                    .multi
984                    .perform()
985                    .context("failed to perform http requests")
986            })?;
987            debug!(target: "network", "handles remaining: {}", n);
988            let results = &mut self.results;
989            let pending = &self.pending;
990            self.set.multi.messages(|msg| {
991                let token = msg.token().expect("failed to read token");
992                let handle = &pending[&token].1;
993                if let Some(result) = msg.result_for(handle) {
994                    results.push((token, result));
995                } else {
996                    debug!(target: "network", "message without a result (?)");
997                }
998            });
999
1000            if let Some(pair) = results.pop() {
1001                break Ok(pair);
1002            }
1003            assert_ne!(self.remaining(), 0);
1004            if self.pending.is_empty() {
1005                let delay = self.sleeping.time_to_next().unwrap();
1006                debug!(target: "network", "sleeping main thread for {delay:?}");
1007                std::thread::sleep(delay);
1008            } else {
1009                let min_timeout = Duration::new(1, 0);
1010                let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1011                let timeout = timeout.min(min_timeout);
1012                self.set
1013                    .multi
1014                    .wait(&mut [], timeout)
1015                    .context("failed to wait on curl `Multi`")?;
1016            }
1017        }
1018    }
1019
1020    fn add_sleepers(&mut self) -> CargoResult<()> {
1021        for (dl, handle) in self.sleeping.to_retry() {
1022            self.pending_ids.insert(dl.id);
1023            self.enqueue(dl, handle)?;
1024        }
1025        Ok(())
1026    }
1027
1028    fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1029        let dl = &self.pending[&token].0;
1030        dl.total.set(total);
1031        let now = Instant::now();
1032        if cur > dl.current.get() {
1033            let delta = cur - dl.current.get();
1034            let threshold = self.next_speed_check_bytes_threshold.get();
1035
1036            dl.current.set(cur);
1037            self.updated_at.set(now);
1038
1039            if delta >= threshold {
1040                self.next_speed_check.set(now + self.timeout.dur);
1041                self.next_speed_check_bytes_threshold
1042                    .set(u64::from(self.timeout.low_speed_limit));
1043            } else {
1044                self.next_speed_check_bytes_threshold.set(threshold - delta);
1045            }
1046        }
1047        if self.tick(WhyTick::DownloadUpdate).is_err() {
1048            return false;
1049        }
1050
1051        // If we've spent too long not actually receiving any data we time out.
1052        if now > self.updated_at.get() + self.timeout.dur {
1053            self.updated_at.set(now);
1054            let msg = format!(
1055                "failed to download any data for `{}` within {}s",
1056                dl.id,
1057                self.timeout.dur.as_secs()
1058            );
1059            dl.timed_out.set(Some(msg));
1060            return false;
1061        }
1062
1063        // If we reached the point in time that we need to check our speed
1064        // limit, see if we've transferred enough data during this threshold. If
1065        // it fails this check then we fail because the download is going too
1066        // slowly.
1067        if now >= self.next_speed_check.get() {
1068            self.next_speed_check.set(now + self.timeout.dur);
1069            assert!(self.next_speed_check_bytes_threshold.get() > 0);
1070            let msg = format!(
1071                "download of `{}` failed to transfer more \
1072                 than {} bytes in {}s",
1073                dl.id,
1074                self.timeout.low_speed_limit,
1075                self.timeout.dur.as_secs()
1076            );
1077            dl.timed_out.set(Some(msg));
1078            return false;
1079        }
1080
1081        true
1082    }
1083
1084    fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1085        let mut progress = self.progress.borrow_mut();
1086        let progress = progress.as_mut().unwrap();
1087
1088        if let WhyTick::DownloadUpdate = why {
1089            if !progress.update_allowed() {
1090                return Ok(());
1091            }
1092        }
1093        let pending = self.remaining();
1094        let mut msg = if pending == 1 {
1095            format!("{} crate", pending)
1096        } else {
1097            format!("{} crates", pending)
1098        };
1099        match why {
1100            WhyTick::Extracting(krate) => {
1101                msg.push_str(&format!(", extracting {} ...", krate));
1102            }
1103            _ => {
1104                let mut dur = Duration::new(0, 0);
1105                let mut remaining = 0;
1106                for (dl, _) in self.pending.values() {
1107                    dur += dl.start.elapsed();
1108                    // If the total/current look weird just throw out the data
1109                    // point, sounds like curl has more to learn before we have
1110                    // the true information.
1111                    if dl.total.get() >= dl.current.get() {
1112                        remaining += dl.total.get() - dl.current.get();
1113                    }
1114                }
1115                if remaining > 0 && dur > Duration::from_millis(500) {
1116                    msg.push_str(&format!(", remaining bytes: {}", ByteSize(remaining)));
1117                }
1118            }
1119        }
1120        progress.print_now(&msg)
1121    }
1122}
1123
1124#[derive(Copy, Clone)]
1125enum WhyTick<'a> {
1126    DownloadStarted,
1127    DownloadUpdate,
1128    DownloadFinished,
1129    Extracting(&'a str),
1130}
1131
1132impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1133    fn drop(&mut self) {
1134        self.set.downloading.set(false);
1135        let progress = self.progress.get_mut().take().unwrap();
1136        // Don't print a download summary if we're not using a progress bar,
1137        // we've already printed lots of `Downloading...` items.
1138        if !progress.is_enabled() {
1139            return;
1140        }
1141        // If we didn't download anything, no need for a summary.
1142        if self.downloads_finished == 0 {
1143            return;
1144        }
1145        // If an error happened, let's not clutter up the output.
1146        if !self.success {
1147            return;
1148        }
1149        // pick the correct plural of crate(s)
1150        let crate_string = if self.downloads_finished == 1 {
1151            "crate"
1152        } else {
1153            "crates"
1154        };
1155        let mut status = format!(
1156            "{} {} ({}) in {}",
1157            self.downloads_finished,
1158            crate_string,
1159            ByteSize(self.downloaded_bytes),
1160            util::elapsed(self.start.elapsed())
1161        );
1162        // print the size of largest crate if it was >1mb
1163        // however don't print if only a single crate was downloaded
1164        // because it is obvious that it will be the largest then
1165        if self.largest.0 > ByteSize::mb(1).0 && self.downloads_finished > 1 {
1166            status.push_str(&format!(
1167                " (largest was `{}` at {})",
1168                self.largest.1,
1169                ByteSize(self.largest.0),
1170            ));
1171        }
1172        // Clear progress before displaying final summary.
1173        drop(progress);
1174        drop(self.set.gctx.shell().status("Downloaded", status));
1175    }
1176}
1177
1178mod tls {
1179    use std::cell::Cell;
1180
1181    use super::Downloads;
1182
1183    thread_local!(static PTR: Cell<usize> = Cell::new(0));
1184
1185    pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1186        let ptr = PTR.with(|p| p.get());
1187        if ptr == 0 {
1188            f(None)
1189        } else {
1190            unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1191        }
1192    }
1193
1194    pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1195        struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1196
1197        impl<'a, T: Copy> Drop for Reset<'a, T> {
1198            fn drop(&mut self) {
1199                self.0.set(self.1);
1200            }
1201        }
1202
1203        PTR.with(|p| {
1204            let _reset = Reset(p, p.get());
1205            p.set(dl as *const Downloads<'_, '_> as usize);
1206            f()
1207        })
1208    }
1209}