1use std::cell::OnceCell;
2use std::cell::{Cell, Ref, RefCell};
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
5use std::fmt;
6use std::hash;
7use std::mem;
8use std::path::{Path, PathBuf};
9use std::rc::Rc;
10use std::time::{Duration, Instant};
11
12use anyhow::Context as _;
13use cargo_util_schemas::manifest::{Hints, RustVersion};
14use curl::easy::Easy;
15use curl::multi::{EasyHandle, Multi};
16use semver::Version;
17use serde::Serialize;
18use tracing::debug;
19
20use crate::core::compiler::{CompileKind, RustcTargetData};
21use crate::core::dependency::DepKind;
22use crate::core::resolver::features::ForceAllTargets;
23use crate::core::resolver::{HasDevUnits, Resolve};
24use crate::core::{
25 CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
26 SourceId, Target,
27};
28use crate::core::{Summary, Workspace};
29use crate::sources::source::{MaybePackage, SourceMap};
30use crate::util::HumanBytes;
31use crate::util::cache_lock::{CacheLock, CacheLockMode};
32use crate::util::errors::{CargoResult, HttpNotSuccessful};
33use crate::util::interning::InternedString;
34use crate::util::network::http::HttpTimeout;
35use crate::util::network::http::http_handle_and_timeout;
36use crate::util::network::retry::{Retry, RetryResult};
37use crate::util::network::sleep::SleepTracker;
38use crate::util::{self, GlobalContext, Progress, ProgressStyle, internal};
39
40#[derive(Clone)]
44pub struct Package {
45 inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49struct PackageInner {
51 manifest: Manifest,
53 manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58 fn cmp(&self, other: &Package) -> Ordering {
59 self.package_id().cmp(&other.package_id())
60 }
61}
62
63impl PartialOrd for Package {
64 fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69#[derive(Serialize)]
71pub struct SerializedPackage {
72 name: InternedString,
73 version: Version,
74 id: PackageIdSpec,
75 license: Option<String>,
76 license_file: Option<String>,
77 description: Option<String>,
78 source: SourceId,
79 dependencies: Vec<SerializedDependency>,
80 targets: Vec<Target>,
81 features: BTreeMap<InternedString, Vec<InternedString>>,
82 manifest_path: PathBuf,
83 metadata: Option<toml::Value>,
84 publish: Option<Vec<String>>,
85 authors: Vec<String>,
86 categories: Vec<String>,
87 keywords: Vec<String>,
88 readme: Option<String>,
89 repository: Option<String>,
90 homepage: Option<String>,
91 documentation: Option<String>,
92 edition: String,
93 links: Option<String>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 metabuild: Option<Vec<String>>,
96 default_run: Option<String>,
97 rust_version: Option<RustVersion>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 hints: Option<Hints>,
100}
101
102impl Package {
103 pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
105 Package {
106 inner: Rc::new(PackageInner {
107 manifest,
108 manifest_path: manifest_path.to_path_buf(),
109 }),
110 }
111 }
112
113 pub fn dependencies(&self) -> &[Dependency] {
115 self.manifest().dependencies()
116 }
117 pub fn manifest(&self) -> &Manifest {
119 &self.inner.manifest
120 }
121 pub fn manifest_mut(&mut self) -> &mut Manifest {
123 &mut Rc::make_mut(&mut self.inner).manifest
124 }
125 pub fn manifest_path(&self) -> &Path {
127 &self.inner.manifest_path
128 }
129 pub fn name(&self) -> InternedString {
131 self.package_id().name()
132 }
133 pub fn package_id(&self) -> PackageId {
135 self.manifest().package_id()
136 }
137 pub fn root(&self) -> &Path {
139 self.manifest_path().parent().unwrap()
140 }
141 pub fn summary(&self) -> &Summary {
143 self.manifest().summary()
144 }
145 pub fn targets(&self) -> &[Target] {
147 self.manifest().targets()
148 }
149 pub fn library(&self) -> Option<&Target> {
151 self.targets().iter().find(|t| t.is_lib())
152 }
153 pub fn version(&self) -> &Version {
155 self.package_id().version()
156 }
157 pub fn authors(&self) -> &Vec<String> {
159 &self.manifest().metadata().authors
160 }
161
162 pub fn publish(&self) -> &Option<Vec<String>> {
166 self.manifest().publish()
167 }
168 pub fn proc_macro(&self) -> bool {
170 self.targets().iter().any(|target| target.proc_macro())
171 }
172 pub fn rust_version(&self) -> Option<&RustVersion> {
174 self.manifest().rust_version()
175 }
176
177 pub fn hints(&self) -> Option<&Hints> {
179 self.manifest().hints()
180 }
181
182 pub fn has_custom_build(&self) -> bool {
184 self.targets().iter().any(|t| t.is_custom_build())
185 }
186
187 pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
188 Package {
189 inner: Rc::new(PackageInner {
190 manifest: self.manifest().clone().map_source(to_replace, replace_with),
191 manifest_path: self.manifest_path().to_owned(),
192 }),
193 }
194 }
195
196 pub fn serialized(
197 &self,
198 unstable_flags: &CliUnstable,
199 cargo_features: &Features,
200 ) -> SerializedPackage {
201 let summary = self.manifest().summary();
202 let package_id = summary.package_id();
203 let manmeta = self.manifest().metadata();
204 let targets: Vec<Target> = self
208 .manifest()
209 .targets()
210 .iter()
211 .filter(|t| t.src_path().is_path())
212 .cloned()
213 .collect();
214 let crate_features = summary
216 .features()
217 .iter()
218 .map(|(k, v)| (*k, v.iter().map(|fv| fv.to_string().into()).collect()))
219 .collect();
220
221 SerializedPackage {
222 name: package_id.name(),
223 version: package_id.version().clone(),
224 id: package_id.to_spec(),
225 license: manmeta.license.clone(),
226 license_file: manmeta.license_file.clone(),
227 description: manmeta.description.clone(),
228 source: summary.source_id(),
229 dependencies: summary
230 .dependencies()
231 .iter()
232 .map(|dep| dep.serialized(unstable_flags, cargo_features))
233 .collect(),
234 targets,
235 features: crate_features,
236 manifest_path: self.manifest_path().to_path_buf(),
237 metadata: self.manifest().custom_metadata().cloned(),
238 authors: manmeta.authors.clone(),
239 categories: manmeta.categories.clone(),
240 keywords: manmeta.keywords.clone(),
241 readme: manmeta.readme.clone(),
242 repository: manmeta.repository.clone(),
243 homepage: manmeta.homepage.clone(),
244 documentation: manmeta.documentation.clone(),
245 edition: self.manifest().edition().to_string(),
246 links: self.manifest().links().map(|s| s.to_owned()),
247 metabuild: self.manifest().metabuild().cloned(),
248 publish: self.publish().as_ref().cloned(),
249 default_run: self.manifest().default_run().map(|s| s.to_owned()),
250 rust_version: self.rust_version().cloned(),
251 hints: self.hints().cloned(),
252 }
253 }
254}
255
256impl fmt::Display for Package {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 write!(f, "{}", self.summary().package_id())
259 }
260}
261
262impl fmt::Debug for Package {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 f.debug_struct("Package")
265 .field("id", &self.summary().package_id())
266 .field("..", &"..")
267 .finish()
268 }
269}
270
271impl PartialEq for Package {
272 fn eq(&self, other: &Package) -> bool {
273 self.package_id() == other.package_id()
274 }
275}
276
277impl Eq for Package {}
278
279impl hash::Hash for Package {
280 fn hash<H: hash::Hasher>(&self, into: &mut H) {
281 self.package_id().hash(into)
282 }
283}
284
285pub struct PackageSet<'gctx> {
290 packages: HashMap<PackageId, OnceCell<Package>>,
291 sources: RefCell<SourceMap<'gctx>>,
292 gctx: &'gctx GlobalContext,
293 multi: Multi,
294 downloading: Cell<bool>,
296 multiplexing: bool,
298}
299
300pub struct Downloads<'a, 'gctx> {
302 set: &'a PackageSet<'gctx>,
303 pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
307 pending_ids: HashSet<PackageId>,
310 sleeping: SleepTracker<(Download<'gctx>, Easy)>,
312 results: Vec<(usize, Result<(), curl::Error>)>,
317 next: usize,
319 progress: RefCell<Option<Progress<'gctx>>>,
321 downloads_finished: usize,
323 downloaded_bytes: u64,
325 largest: (u64, InternedString),
327 start: Instant,
329 success: bool,
331
332 timeout: HttpTimeout,
339 updated_at: Cell<Instant>,
341 next_speed_check: Cell<Instant>,
346 next_speed_check_bytes_threshold: Cell<u64>,
353 _lock: CacheLock<'gctx>,
356}
357
358struct Download<'gctx> {
359 token: usize,
362
363 id: PackageId,
365
366 data: RefCell<Vec<u8>>,
368
369 headers: RefCell<Vec<String>>,
371
372 url: String,
375
376 descriptor: String,
378
379 total: Cell<u64>,
381 current: Cell<u64>,
382
383 start: Instant,
385 timed_out: Cell<Option<String>>,
386
387 retry: Retry<'gctx>,
389}
390
391impl<'gctx> PackageSet<'gctx> {
392 pub fn new(
393 package_ids: &[PackageId],
394 sources: SourceMap<'gctx>,
395 gctx: &'gctx GlobalContext,
396 ) -> CargoResult<PackageSet<'gctx>> {
397 let mut multi = Multi::new();
400 let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
401 multi
402 .pipelining(false, multiplexing)
403 .context("failed to enable multiplexing/pipelining in curl")?;
404
405 multi.set_max_host_connections(2)?;
407
408 Ok(PackageSet {
409 packages: package_ids
410 .iter()
411 .map(|&id| (id, OnceCell::new()))
412 .collect(),
413 sources: RefCell::new(sources),
414 gctx,
415 multi,
416 downloading: Cell::new(false),
417 multiplexing,
418 })
419 }
420
421 pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
422 self.packages.keys().cloned()
423 }
424
425 pub fn packages(&self) -> impl Iterator<Item = &Package> {
426 self.packages.values().filter_map(|p| p.get())
427 }
428
429 pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
430 assert!(!self.downloading.replace(true));
431 let timeout = HttpTimeout::new(self.gctx)?;
432 Ok(Downloads {
433 start: Instant::now(),
434 set: self,
435 next: 0,
436 pending: HashMap::new(),
437 pending_ids: HashSet::new(),
438 sleeping: SleepTracker::new(),
439 results: Vec::new(),
440 progress: RefCell::new(Some(Progress::with_style(
441 "Downloading",
442 ProgressStyle::Ratio,
443 self.gctx,
444 ))),
445 downloads_finished: 0,
446 downloaded_bytes: 0,
447 largest: (0, "".into()),
448 success: false,
449 updated_at: Cell::new(Instant::now()),
450 timeout,
451 next_speed_check: Cell::new(Instant::now()),
452 next_speed_check_bytes_threshold: Cell::new(0),
453 _lock: self
454 .gctx
455 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
456 })
457 }
458
459 pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
460 if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.get()) {
461 return Ok(pkg);
462 }
463 Ok(self.get_many(Some(id))?.remove(0))
464 }
465
466 pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
467 let mut pkgs = Vec::new();
468 let _lock = self
469 .gctx
470 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
471 let mut downloads = self.enable_download()?;
472 for id in ids {
473 pkgs.extend(downloads.start(id)?);
474 }
475 while downloads.remaining() > 0 {
476 pkgs.push(downloads.wait()?);
477 }
478 downloads.success = true;
479 drop(downloads);
480
481 let mut deferred = self.gctx.deferred_global_last_use()?;
482 deferred.save_no_error(self.gctx);
483 Ok(pkgs)
484 }
485
486 #[tracing::instrument(skip_all)]
488 pub fn download_accessible(
489 &self,
490 resolve: &Resolve,
491 root_ids: &[PackageId],
492 has_dev_units: HasDevUnits,
493 requested_kinds: &[CompileKind],
494 target_data: &RustcTargetData<'gctx>,
495 force_all_targets: ForceAllTargets,
496 ) -> CargoResult<()> {
497 fn collect_used_deps(
498 used: &mut BTreeSet<(PackageId, CompileKind)>,
499 resolve: &Resolve,
500 pkg_id: PackageId,
501 has_dev_units: HasDevUnits,
502 requested_kind: CompileKind,
503 target_data: &RustcTargetData<'_>,
504 force_all_targets: ForceAllTargets,
505 ) -> CargoResult<()> {
506 if !used.insert((pkg_id, requested_kind)) {
507 return Ok(());
508 }
509 let requested_kinds = &[requested_kind];
510 let filtered_deps = PackageSet::filter_deps(
511 pkg_id,
512 resolve,
513 has_dev_units,
514 requested_kinds,
515 target_data,
516 force_all_targets,
517 );
518 for (pkg_id, deps) in filtered_deps {
519 collect_used_deps(
520 used,
521 resolve,
522 pkg_id,
523 has_dev_units,
524 requested_kind,
525 target_data,
526 force_all_targets,
527 )?;
528 let artifact_kinds = deps.iter().filter_map(|dep| {
529 Some(
530 dep.artifact()?
531 .target()?
532 .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
533 )
534 });
535 for artifact_kind in artifact_kinds {
536 collect_used_deps(
537 used,
538 resolve,
539 pkg_id,
540 has_dev_units,
541 artifact_kind,
542 target_data,
543 force_all_targets,
544 )?;
545 }
546 }
547 Ok(())
548 }
549
550 let mut to_download = BTreeSet::new();
554
555 for id in root_ids {
556 for requested_kind in requested_kinds {
557 collect_used_deps(
558 &mut to_download,
559 resolve,
560 *id,
561 has_dev_units,
562 *requested_kind,
563 target_data,
564 force_all_targets,
565 )?;
566 }
567 }
568 let to_download = to_download
569 .into_iter()
570 .map(|(p, _)| p)
571 .collect::<BTreeSet<_>>();
572 self.get_many(to_download.into_iter())?;
573 Ok(())
574 }
575
576 pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
579 &self,
580 ws: &Workspace<'gctx>,
581 resolve: &Resolve,
582 root_ids: &[PackageId],
583 has_dev_units: HasDevUnits,
584 requested_kinds: &[CompileKind],
585 target_data: &RustcTargetData<'_>,
586 force_all_targets: ForceAllTargets,
587 ) -> CargoResult<()> {
588 let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
589 .iter()
590 .map(|&root_id| {
591 let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
592 root_id,
593 resolve,
594 has_dev_units,
595 requested_kinds,
596 target_data,
597 force_all_targets,
598 )
599 .collect();
600
601 let dep_pkgs_and_deps = dep_pkgs_to_deps
602 .into_iter()
603 .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
604 .filter_map(|(dep_package_id, deps)| {
605 self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
606 (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
607 })
608 })
609 .collect();
610 (root_id, dep_pkgs_and_deps)
611 })
612 .collect();
613
614 for (pkg_id, dep_pkgs) in no_lib_pkgs {
615 for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
616 for dep in deps.iter().filter(|dep| {
617 dep.artifact()
618 .map(|artifact| artifact.is_lib())
619 .unwrap_or(true)
620 }) {
621 ws.gctx().shell().warn(&format!(
622 "{} ignoring invalid dependency `{}` which is missing a lib target",
623 pkg_id,
624 dep.name_in_toml(),
625 ))?;
626 }
627 }
628 }
629 Ok(())
630 }
631
632 pub fn filter_deps<'a>(
633 pkg_id: PackageId,
634 resolve: &'a Resolve,
635 has_dev_units: HasDevUnits,
636 requested_kinds: &'a [CompileKind],
637 target_data: &'a RustcTargetData<'_>,
638 force_all_targets: ForceAllTargets,
639 ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
640 resolve
641 .deps(pkg_id)
642 .filter(move |&(_id, deps)| {
643 deps.iter().any(|dep| {
644 if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
645 return false;
646 }
647 if force_all_targets == ForceAllTargets::No {
648 let activated = requested_kinds
649 .iter()
650 .chain(Some(&CompileKind::Host))
651 .any(|kind| target_data.dep_platform_activated(dep, *kind));
652 if !activated {
653 return false;
654 }
655 }
656 true
657 })
658 })
659 .into_iter()
660 }
661
662 pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
663 self.sources.borrow()
664 }
665
666 pub fn add_set(&mut self, set: PackageSet<'gctx>) {
668 assert!(!self.downloading.get());
669 assert!(!set.downloading.get());
670 for (pkg_id, p_cell) in set.packages {
671 self.packages.entry(pkg_id).or_insert(p_cell);
672 }
673 let mut sources = self.sources.borrow_mut();
674 let other_sources = set.sources.into_inner();
675 sources.add_source_map(other_sources);
676 }
677}
678
679impl<'a, 'gctx> Downloads<'a, 'gctx> {
680 #[tracing::instrument(skip_all)]
686 pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
687 self.start_inner(id)
688 .with_context(|| format!("failed to download `{}`", id))
689 }
690
691 fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
692 let slot = self
695 .set
696 .packages
697 .get(&id)
698 .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
699 if let Some(pkg) = slot.get() {
700 return Ok(Some(pkg));
701 }
702
703 let sources = self.set.sources.borrow_mut();
707 let source = sources
708 .get(id.source_id())
709 .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
710 let pkg = source
711 .download(id)
712 .context("unable to get packages from source")?;
713 let (url, descriptor, authorization) = match pkg {
714 MaybePackage::Ready(pkg) => {
715 debug!("{} doesn't need a download", id);
716 assert!(slot.set(pkg).is_ok());
717 return Ok(Some(slot.get().unwrap()));
718 }
719 MaybePackage::Download {
720 url,
721 descriptor,
722 authorization,
723 } => (url, descriptor, authorization),
724 };
725
726 let token = self.next;
731 self.next += 1;
732 debug!(target: "network", "downloading {} as {}", id, token);
733 assert!(self.pending_ids.insert(id));
734
735 let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
736 handle.get(true)?;
737 handle.url(&url)?;
738 handle.follow_location(true)?; if let Some(authorization) = authorization {
742 let mut headers = curl::easy::List::new();
743 headers.append(&format!("Authorization: {}", authorization))?;
744 handle.http_headers(headers)?;
745 }
746
747 crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
749
750 handle.write_function(move |buf| {
751 debug!(target: "network", "{} - {} bytes of data", token, buf.len());
752 tls::with(|downloads| {
753 if let Some(downloads) = downloads {
754 downloads.pending[&token]
755 .0
756 .data
757 .borrow_mut()
758 .extend_from_slice(buf);
759 }
760 });
761 Ok(buf.len())
762 })?;
763 handle.header_function(move |data| {
764 tls::with(|downloads| {
765 if let Some(downloads) = downloads {
766 let h = String::from_utf8_lossy(data).trim().to_string();
769 downloads.pending[&token].0.headers.borrow_mut().push(h);
770 }
771 });
772 true
773 })?;
774
775 handle.progress(true)?;
776 handle.progress_function(move |dl_total, dl_cur, _, _| {
777 tls::with(|downloads| match downloads {
778 Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
779 None => false,
780 })
781 })?;
782
783 if self.downloads_finished == 0
787 && self.pending.is_empty()
788 && !self.progress.borrow().as_ref().unwrap().is_enabled()
789 {
790 self.set.gctx.shell().status("Downloading", "crates ...")?;
791 }
792
793 let dl = Download {
794 token,
795 data: RefCell::new(Vec::new()),
796 headers: RefCell::new(Vec::new()),
797 id,
798 url,
799 descriptor,
800 total: Cell::new(0),
801 current: Cell::new(0),
802 start: Instant::now(),
803 timed_out: Cell::new(None),
804 retry: Retry::new(self.set.gctx)?,
805 };
806 self.enqueue(dl, handle)?;
807 self.tick(WhyTick::DownloadStarted)?;
808
809 Ok(None)
810 }
811
812 pub fn remaining(&self) -> usize {
814 self.pending.len() + self.sleeping.len()
815 }
816
817 #[tracing::instrument(skip_all)]
826 pub fn wait(&mut self) -> CargoResult<&'a Package> {
827 let (dl, data) = loop {
828 assert_eq!(self.pending.len(), self.pending_ids.len());
829 let (token, result) = self.wait_for_curl()?;
830 debug!(target: "network", "{} finished with {:?}", token, result);
831
832 let (mut dl, handle) = self
833 .pending
834 .remove(&token)
835 .expect("got a token for a non-in-progress transfer");
836 let data = mem::take(&mut *dl.data.borrow_mut());
837 let headers = mem::take(&mut *dl.headers.borrow_mut());
838 let mut handle = self.set.multi.remove(handle)?;
839 self.pending_ids.remove(&dl.id);
840
841 let ret = {
845 let timed_out = &dl.timed_out;
846 let url = &dl.url;
847 dl.retry.r#try(|| {
848 if let Err(e) = result {
849 if !e.is_aborted_by_callback() {
857 return Err(e.into());
858 }
859
860 return Err(match timed_out.replace(None) {
861 Some(msg) => {
862 let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
863 let mut err = curl::Error::new(code);
864 err.set_extra(msg);
865 err
866 }
867 None => e,
868 }
869 .into());
870 }
871
872 let code = handle.response_code()?;
873 if code != 200 && code != 0 {
874 return Err(HttpNotSuccessful::new_from_handle(
875 &mut handle,
876 &url,
877 data,
878 headers,
879 )
880 .into());
881 }
882 Ok(data)
883 })
884 };
885 match ret {
886 RetryResult::Success(data) => break (dl, data),
887 RetryResult::Err(e) => {
888 return Err(e.context(format!("failed to download from `{}`", dl.url)));
889 }
890 RetryResult::Retry(sleep) => {
891 debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
892 self.sleeping.push(sleep, (dl, handle));
893 }
894 }
895 };
896
897 self.progress.borrow_mut().as_mut().unwrap().clear();
901 self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
902
903 self.downloads_finished += 1;
904 self.downloaded_bytes += dl.total.get();
905 if dl.total.get() > self.largest.0 {
906 self.largest = (dl.total.get(), dl.id.name());
907 }
908
909 let kib_400 = 1024 * 400;
915 if dl.total.get() < kib_400 {
916 self.tick(WhyTick::DownloadFinished)?;
917 } else {
918 self.tick(WhyTick::Extracting(&dl.id.name()))?;
919 }
920
921 let sources = self.set.sources.borrow_mut();
924 let source = sources
925 .get(dl.id.source_id())
926 .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
927 let start = Instant::now();
928 let pkg = source.finish_download(dl.id, data)?;
929
930 let finish_dur = start.elapsed();
935 self.updated_at.set(self.updated_at.get() + finish_dur);
936 self.next_speed_check
937 .set(self.next_speed_check.get() + finish_dur);
938
939 let slot = &self.set.packages[&dl.id];
940 assert!(slot.set(pkg).is_ok());
941 Ok(slot.get().unwrap())
942 }
943
944 fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
945 let mut handle = self.set.multi.add(handle)?;
946 let now = Instant::now();
947 handle.set_token(dl.token)?;
948 self.updated_at.set(now);
949 self.next_speed_check.set(now + self.timeout.dur);
950 self.next_speed_check_bytes_threshold
951 .set(u64::from(self.timeout.low_speed_limit));
952 dl.timed_out.set(None);
953 dl.current.set(0);
954 dl.total.set(0);
955 self.pending.insert(dl.token, (dl, handle));
956 Ok(())
957 }
958
959 fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
962 loop {
978 self.add_sleepers()?;
979 let n = tls::set(self, || {
980 self.set
981 .multi
982 .perform()
983 .context("failed to perform http requests")
984 })?;
985 debug!(target: "network", "handles remaining: {}", n);
986 let results = &mut self.results;
987 let pending = &self.pending;
988 self.set.multi.messages(|msg| {
989 let token = msg.token().expect("failed to read token");
990 let handle = &pending[&token].1;
991 if let Some(result) = msg.result_for(handle) {
992 results.push((token, result));
993 } else {
994 debug!(target: "network", "message without a result (?)");
995 }
996 });
997
998 if let Some(pair) = results.pop() {
999 break Ok(pair);
1000 }
1001 assert_ne!(self.remaining(), 0);
1002 if self.pending.is_empty() {
1003 let delay = self.sleeping.time_to_next().unwrap();
1004 debug!(target: "network", "sleeping main thread for {delay:?}");
1005 std::thread::sleep(delay);
1006 } else {
1007 let min_timeout = Duration::new(1, 0);
1008 let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1009 let timeout = timeout.min(min_timeout);
1010 self.set
1011 .multi
1012 .wait(&mut [], timeout)
1013 .context("failed to wait on curl `Multi`")?;
1014 }
1015 }
1016 }
1017
1018 fn add_sleepers(&mut self) -> CargoResult<()> {
1019 for (dl, handle) in self.sleeping.to_retry() {
1020 self.pending_ids.insert(dl.id);
1021 self.enqueue(dl, handle)?;
1022 }
1023 Ok(())
1024 }
1025
1026 fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1027 let dl = &self.pending[&token].0;
1028 dl.total.set(total);
1029 let now = Instant::now();
1030 if cur > dl.current.get() {
1031 let delta = cur - dl.current.get();
1032 let threshold = self.next_speed_check_bytes_threshold.get();
1033
1034 dl.current.set(cur);
1035 self.updated_at.set(now);
1036
1037 if delta >= threshold {
1038 self.next_speed_check.set(now + self.timeout.dur);
1039 self.next_speed_check_bytes_threshold
1040 .set(u64::from(self.timeout.low_speed_limit));
1041 } else {
1042 self.next_speed_check_bytes_threshold.set(threshold - delta);
1043 }
1044 }
1045 if self.tick(WhyTick::DownloadUpdate).is_err() {
1046 return false;
1047 }
1048
1049 if now > self.updated_at.get() + self.timeout.dur {
1051 self.updated_at.set(now);
1052 let msg = format!(
1053 "failed to download any data for `{}` within {}s",
1054 dl.id,
1055 self.timeout.dur.as_secs()
1056 );
1057 dl.timed_out.set(Some(msg));
1058 return false;
1059 }
1060
1061 if now >= self.next_speed_check.get() {
1066 self.next_speed_check.set(now + self.timeout.dur);
1067 assert!(self.next_speed_check_bytes_threshold.get() > 0);
1068 let msg = format!(
1069 "download of `{}` failed to transfer more \
1070 than {} bytes in {}s",
1071 dl.id,
1072 self.timeout.low_speed_limit,
1073 self.timeout.dur.as_secs()
1074 );
1075 dl.timed_out.set(Some(msg));
1076 return false;
1077 }
1078
1079 true
1080 }
1081
1082 fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1083 let mut progress = self.progress.borrow_mut();
1084 let progress = progress.as_mut().unwrap();
1085
1086 if let WhyTick::DownloadUpdate = why {
1087 if !progress.update_allowed() {
1088 return Ok(());
1089 }
1090 }
1091 let pending = self.remaining();
1092 let mut msg = if pending == 1 {
1093 format!("{} crate", pending)
1094 } else {
1095 format!("{} crates", pending)
1096 };
1097 match why {
1098 WhyTick::Extracting(krate) => {
1099 msg.push_str(&format!(", extracting {} ...", krate));
1100 }
1101 _ => {
1102 let mut dur = Duration::new(0, 0);
1103 let mut remaining = 0;
1104 for (dl, _) in self.pending.values() {
1105 dur += dl.start.elapsed();
1106 if dl.total.get() >= dl.current.get() {
1110 remaining += dl.total.get() - dl.current.get();
1111 }
1112 }
1113 if remaining > 0 && dur > Duration::from_millis(500) {
1114 msg.push_str(&format!(", remaining bytes: {:.1}", HumanBytes(remaining)));
1115 }
1116 }
1117 }
1118 progress.print_now(&msg)
1119 }
1120}
1121
1122#[derive(Copy, Clone)]
1123enum WhyTick<'a> {
1124 DownloadStarted,
1125 DownloadUpdate,
1126 DownloadFinished,
1127 Extracting(&'a str),
1128}
1129
1130impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1131 fn drop(&mut self) {
1132 self.set.downloading.set(false);
1133 let progress = self.progress.get_mut().take().unwrap();
1134 if !progress.is_enabled() {
1137 return;
1138 }
1139 if self.downloads_finished == 0 {
1141 return;
1142 }
1143 if !self.success {
1145 return;
1146 }
1147 let crate_string = if self.downloads_finished == 1 {
1149 "crate"
1150 } else {
1151 "crates"
1152 };
1153 let mut status = format!(
1154 "{} {} ({:.1}) in {}",
1155 self.downloads_finished,
1156 crate_string,
1157 HumanBytes(self.downloaded_bytes),
1158 util::elapsed(self.start.elapsed())
1159 );
1160 let mib_1 = 1024 * 1024;
1164 if self.largest.0 > mib_1 && self.downloads_finished > 1 {
1165 status.push_str(&format!(
1166 " (largest was `{}` at {:.1})",
1167 self.largest.1,
1168 HumanBytes(self.largest.0),
1169 ));
1170 }
1171 drop(progress);
1173 drop(self.set.gctx.shell().status("Downloaded", status));
1174 }
1175}
1176
1177mod tls {
1178 use std::cell::Cell;
1179
1180 use super::Downloads;
1181
1182 thread_local!(static PTR: Cell<usize> = const { Cell::new(0) });
1183
1184 pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1185 let ptr = PTR.with(|p| p.get());
1186 if ptr == 0 {
1187 f(None)
1188 } else {
1189 unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1190 }
1191 }
1192
1193 pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1194 struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1195
1196 impl<'a, T: Copy> Drop for Reset<'a, T> {
1197 fn drop(&mut self) {
1198 self.0.set(self.1);
1199 }
1200 }
1201
1202 PTR.with(|p| {
1203 let _reset = Reset(p, p.get());
1204 p.set(dl as *const Downloads<'_, '_> as usize);
1205 f()
1206 })
1207 }
1208}