1use std::cell::{Cell, Ref, RefCell, RefMut};
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
4use std::fmt;
5use std::hash;
6use std::mem;
7use std::path::{Path, PathBuf};
8use std::rc::Rc;
9use std::time::{Duration, Instant};
10
11use anyhow::Context as _;
12use bytesize::ByteSize;
13use cargo_util_schemas::manifest::RustVersion;
14use curl::easy::Easy;
15use curl::multi::{EasyHandle, Multi};
16use lazycell::LazyCell;
17use semver::Version;
18use serde::Serialize;
19use tracing::debug;
20
21use crate::core::compiler::{CompileKind, RustcTargetData};
22use crate::core::dependency::DepKind;
23use crate::core::resolver::features::ForceAllTargets;
24use crate::core::resolver::{HasDevUnits, Resolve};
25use crate::core::{
26 CliUnstable, Dependency, Features, Manifest, PackageId, PackageIdSpec, SerializedDependency,
27 SourceId, Target,
28};
29use crate::core::{Summary, Workspace};
30use crate::sources::source::{MaybePackage, SourceMap};
31use crate::util::cache_lock::{CacheLock, CacheLockMode};
32use crate::util::errors::{CargoResult, HttpNotSuccessful};
33use crate::util::interning::InternedString;
34use crate::util::network::http::http_handle_and_timeout;
35use crate::util::network::http::HttpTimeout;
36use crate::util::network::retry::{Retry, RetryResult};
37use crate::util::network::sleep::SleepTracker;
38use crate::util::{self, internal, GlobalContext, Progress, ProgressStyle};
39
40#[derive(Clone)]
44pub struct Package {
45 inner: Rc<PackageInner>,
46}
47
48#[derive(Clone)]
49struct PackageInner {
51 manifest: Manifest,
53 manifest_path: PathBuf,
55}
56
57impl Ord for Package {
58 fn cmp(&self, other: &Package) -> Ordering {
59 self.package_id().cmp(&other.package_id())
60 }
61}
62
63impl PartialOrd for Package {
64 fn partial_cmp(&self, other: &Package) -> Option<Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69#[derive(Serialize)]
71pub struct SerializedPackage {
72 name: InternedString,
73 version: Version,
74 id: PackageIdSpec,
75 license: Option<String>,
76 license_file: Option<String>,
77 description: Option<String>,
78 source: SourceId,
79 dependencies: Vec<SerializedDependency>,
80 targets: Vec<Target>,
81 features: BTreeMap<InternedString, Vec<InternedString>>,
82 manifest_path: PathBuf,
83 metadata: Option<toml::Value>,
84 publish: Option<Vec<String>>,
85 authors: Vec<String>,
86 categories: Vec<String>,
87 keywords: Vec<String>,
88 readme: Option<String>,
89 repository: Option<String>,
90 homepage: Option<String>,
91 documentation: Option<String>,
92 edition: String,
93 links: Option<String>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 metabuild: Option<Vec<String>>,
96 default_run: Option<String>,
97 rust_version: Option<RustVersion>,
98}
99
100impl Package {
101 pub fn new(manifest: Manifest, manifest_path: &Path) -> Package {
103 Package {
104 inner: Rc::new(PackageInner {
105 manifest,
106 manifest_path: manifest_path.to_path_buf(),
107 }),
108 }
109 }
110
111 pub fn dependencies(&self) -> &[Dependency] {
113 self.manifest().dependencies()
114 }
115 pub fn manifest(&self) -> &Manifest {
117 &self.inner.manifest
118 }
119 pub fn manifest_mut(&mut self) -> &mut Manifest {
121 &mut Rc::make_mut(&mut self.inner).manifest
122 }
123 pub fn manifest_path(&self) -> &Path {
125 &self.inner.manifest_path
126 }
127 pub fn name(&self) -> InternedString {
129 self.package_id().name()
130 }
131 pub fn package_id(&self) -> PackageId {
133 self.manifest().package_id()
134 }
135 pub fn root(&self) -> &Path {
137 self.manifest_path().parent().unwrap()
138 }
139 pub fn summary(&self) -> &Summary {
141 self.manifest().summary()
142 }
143 pub fn targets(&self) -> &[Target] {
145 self.manifest().targets()
146 }
147 pub fn library(&self) -> Option<&Target> {
149 self.targets().iter().find(|t| t.is_lib())
150 }
151 pub fn version(&self) -> &Version {
153 self.package_id().version()
154 }
155 pub fn authors(&self) -> &Vec<String> {
157 &self.manifest().metadata().authors
158 }
159
160 pub fn publish(&self) -> &Option<Vec<String>> {
164 self.manifest().publish()
165 }
166 pub fn proc_macro(&self) -> bool {
168 self.targets().iter().any(|target| target.proc_macro())
169 }
170 pub fn rust_version(&self) -> Option<&RustVersion> {
172 self.manifest().rust_version()
173 }
174
175 pub fn has_custom_build(&self) -> bool {
177 self.targets().iter().any(|t| t.is_custom_build())
178 }
179
180 pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package {
181 Package {
182 inner: Rc::new(PackageInner {
183 manifest: self.manifest().clone().map_source(to_replace, replace_with),
184 manifest_path: self.manifest_path().to_owned(),
185 }),
186 }
187 }
188
189 pub fn serialized(
190 &self,
191 unstable_flags: &CliUnstable,
192 cargo_features: &Features,
193 ) -> SerializedPackage {
194 let summary = self.manifest().summary();
195 let package_id = summary.package_id();
196 let manmeta = self.manifest().metadata();
197 let targets: Vec<Target> = self
201 .manifest()
202 .targets()
203 .iter()
204 .filter(|t| t.src_path().is_path())
205 .cloned()
206 .collect();
207 let crate_features = summary
209 .features()
210 .iter()
211 .map(|(k, v)| {
212 (
213 *k,
214 v.iter()
215 .map(|fv| InternedString::new(&fv.to_string()))
216 .collect(),
217 )
218 })
219 .collect();
220
221 SerializedPackage {
222 name: package_id.name(),
223 version: package_id.version().clone(),
224 id: package_id.to_spec(),
225 license: manmeta.license.clone(),
226 license_file: manmeta.license_file.clone(),
227 description: manmeta.description.clone(),
228 source: summary.source_id(),
229 dependencies: summary
230 .dependencies()
231 .iter()
232 .map(|dep| dep.serialized(unstable_flags, cargo_features))
233 .collect(),
234 targets,
235 features: crate_features,
236 manifest_path: self.manifest_path().to_path_buf(),
237 metadata: self.manifest().custom_metadata().cloned(),
238 authors: manmeta.authors.clone(),
239 categories: manmeta.categories.clone(),
240 keywords: manmeta.keywords.clone(),
241 readme: manmeta.readme.clone(),
242 repository: manmeta.repository.clone(),
243 homepage: manmeta.homepage.clone(),
244 documentation: manmeta.documentation.clone(),
245 edition: self.manifest().edition().to_string(),
246 links: self.manifest().links().map(|s| s.to_owned()),
247 metabuild: self.manifest().metabuild().cloned(),
248 publish: self.publish().as_ref().cloned(),
249 default_run: self.manifest().default_run().map(|s| s.to_owned()),
250 rust_version: self.rust_version().cloned(),
251 }
252 }
253}
254
255impl fmt::Display for Package {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 write!(f, "{}", self.summary().package_id())
258 }
259}
260
261impl fmt::Debug for Package {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("Package")
264 .field("id", &self.summary().package_id())
265 .field("..", &"..")
266 .finish()
267 }
268}
269
270impl PartialEq for Package {
271 fn eq(&self, other: &Package) -> bool {
272 self.package_id() == other.package_id()
273 }
274}
275
276impl Eq for Package {}
277
278impl hash::Hash for Package {
279 fn hash<H: hash::Hasher>(&self, into: &mut H) {
280 self.package_id().hash(into)
281 }
282}
283
284pub struct PackageSet<'gctx> {
289 packages: HashMap<PackageId, LazyCell<Package>>,
290 sources: RefCell<SourceMap<'gctx>>,
291 gctx: &'gctx GlobalContext,
292 multi: Multi,
293 downloading: Cell<bool>,
295 multiplexing: bool,
297}
298
299pub struct Downloads<'a, 'gctx> {
301 set: &'a PackageSet<'gctx>,
302 pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
306 pending_ids: HashSet<PackageId>,
309 sleeping: SleepTracker<(Download<'gctx>, Easy)>,
311 results: Vec<(usize, Result<(), curl::Error>)>,
316 next: usize,
318 progress: RefCell<Option<Progress<'gctx>>>,
320 downloads_finished: usize,
322 downloaded_bytes: u64,
324 largest: (u64, InternedString),
326 start: Instant,
328 success: bool,
330
331 timeout: HttpTimeout,
338 updated_at: Cell<Instant>,
340 next_speed_check: Cell<Instant>,
345 next_speed_check_bytes_threshold: Cell<u64>,
352 _lock: CacheLock<'gctx>,
355}
356
357struct Download<'gctx> {
358 token: usize,
361
362 id: PackageId,
364
365 data: RefCell<Vec<u8>>,
367
368 headers: RefCell<Vec<String>>,
370
371 url: String,
374
375 descriptor: String,
377
378 total: Cell<u64>,
380 current: Cell<u64>,
381
382 start: Instant,
384 timed_out: Cell<Option<String>>,
385
386 retry: Retry<'gctx>,
388}
389
390impl<'gctx> PackageSet<'gctx> {
391 pub fn new(
392 package_ids: &[PackageId],
393 sources: SourceMap<'gctx>,
394 gctx: &'gctx GlobalContext,
395 ) -> CargoResult<PackageSet<'gctx>> {
396 let mut multi = Multi::new();
399 let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true);
400 multi
401 .pipelining(false, multiplexing)
402 .context("failed to enable multiplexing/pipelining in curl")?;
403
404 multi.set_max_host_connections(2)?;
406
407 Ok(PackageSet {
408 packages: package_ids
409 .iter()
410 .map(|&id| (id, LazyCell::new()))
411 .collect(),
412 sources: RefCell::new(sources),
413 gctx,
414 multi,
415 downloading: Cell::new(false),
416 multiplexing,
417 })
418 }
419
420 pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ {
421 self.packages.keys().cloned()
422 }
423
424 pub fn packages(&self) -> impl Iterator<Item = &Package> {
425 self.packages.values().filter_map(|p| p.borrow())
426 }
427
428 pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'gctx>> {
429 assert!(!self.downloading.replace(true));
430 let timeout = HttpTimeout::new(self.gctx)?;
431 Ok(Downloads {
432 start: Instant::now(),
433 set: self,
434 next: 0,
435 pending: HashMap::new(),
436 pending_ids: HashSet::new(),
437 sleeping: SleepTracker::new(),
438 results: Vec::new(),
439 progress: RefCell::new(Some(Progress::with_style(
440 "Downloading",
441 ProgressStyle::Ratio,
442 self.gctx,
443 ))),
444 downloads_finished: 0,
445 downloaded_bytes: 0,
446 largest: (0, InternedString::new("")),
447 success: false,
448 updated_at: Cell::new(Instant::now()),
449 timeout,
450 next_speed_check: Cell::new(Instant::now()),
451 next_speed_check_bytes_threshold: Cell::new(0),
452 _lock: self
453 .gctx
454 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?,
455 })
456 }
457
458 pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> {
459 if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) {
460 return Ok(pkg);
461 }
462 Ok(self.get_many(Some(id))?.remove(0))
463 }
464
465 pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> {
466 let mut pkgs = Vec::new();
467 let _lock = self
468 .gctx
469 .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?;
470 let mut downloads = self.enable_download()?;
471 for id in ids {
472 pkgs.extend(downloads.start(id)?);
473 }
474 while downloads.remaining() > 0 {
475 pkgs.push(downloads.wait()?);
476 }
477 downloads.success = true;
478 drop(downloads);
479
480 let mut deferred = self.gctx.deferred_global_last_use()?;
481 deferred.save_no_error(self.gctx);
482 Ok(pkgs)
483 }
484
485 #[tracing::instrument(skip_all)]
487 pub fn download_accessible(
488 &self,
489 resolve: &Resolve,
490 root_ids: &[PackageId],
491 has_dev_units: HasDevUnits,
492 requested_kinds: &[CompileKind],
493 target_data: &RustcTargetData<'gctx>,
494 force_all_targets: ForceAllTargets,
495 ) -> CargoResult<()> {
496 fn collect_used_deps(
497 used: &mut BTreeSet<(PackageId, CompileKind)>,
498 resolve: &Resolve,
499 pkg_id: PackageId,
500 has_dev_units: HasDevUnits,
501 requested_kind: CompileKind,
502 target_data: &RustcTargetData<'_>,
503 force_all_targets: ForceAllTargets,
504 ) -> CargoResult<()> {
505 if !used.insert((pkg_id, requested_kind)) {
506 return Ok(());
507 }
508 let requested_kinds = &[requested_kind];
509 let filtered_deps = PackageSet::filter_deps(
510 pkg_id,
511 resolve,
512 has_dev_units,
513 requested_kinds,
514 target_data,
515 force_all_targets,
516 );
517 for (pkg_id, deps) in filtered_deps {
518 collect_used_deps(
519 used,
520 resolve,
521 pkg_id,
522 has_dev_units,
523 requested_kind,
524 target_data,
525 force_all_targets,
526 )?;
527 let artifact_kinds = deps.iter().filter_map(|dep| {
528 Some(
529 dep.artifact()?
530 .target()?
531 .to_resolved_compile_kind(*requested_kinds.iter().next().unwrap()),
532 )
533 });
534 for artifact_kind in artifact_kinds {
535 collect_used_deps(
536 used,
537 resolve,
538 pkg_id,
539 has_dev_units,
540 artifact_kind,
541 target_data,
542 force_all_targets,
543 )?;
544 }
545 }
546 Ok(())
547 }
548
549 let mut to_download = BTreeSet::new();
553
554 for id in root_ids {
555 for requested_kind in requested_kinds {
556 collect_used_deps(
557 &mut to_download,
558 resolve,
559 *id,
560 has_dev_units,
561 *requested_kind,
562 target_data,
563 force_all_targets,
564 )?;
565 }
566 }
567 let to_download = to_download
568 .into_iter()
569 .map(|(p, _)| p)
570 .collect::<BTreeSet<_>>();
571 self.get_many(to_download.into_iter())?;
572 Ok(())
573 }
574
575 pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps(
578 &self,
579 ws: &Workspace<'gctx>,
580 resolve: &Resolve,
581 root_ids: &[PackageId],
582 has_dev_units: HasDevUnits,
583 requested_kinds: &[CompileKind],
584 target_data: &RustcTargetData<'_>,
585 force_all_targets: ForceAllTargets,
586 ) -> CargoResult<()> {
587 let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids
588 .iter()
589 .map(|&root_id| {
590 let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps(
591 root_id,
592 resolve,
593 has_dev_units,
594 requested_kinds,
595 target_data,
596 force_all_targets,
597 )
598 .collect();
599
600 let dep_pkgs_and_deps = dep_pkgs_to_deps
601 .into_iter()
602 .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib()))
603 .filter_map(|(dep_package_id, deps)| {
604 self.get_one(dep_package_id).ok().and_then(|dep_pkg| {
605 (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps))
606 })
607 })
608 .collect();
609 (root_id, dep_pkgs_and_deps)
610 })
611 .collect();
612
613 for (pkg_id, dep_pkgs) in no_lib_pkgs {
614 for (_dep_pkg_without_lib_target, deps) in dep_pkgs {
615 for dep in deps.iter().filter(|dep| {
616 dep.artifact()
617 .map(|artifact| artifact.is_lib())
618 .unwrap_or(true)
619 }) {
620 ws.gctx().shell().warn(&format!(
621 "{} ignoring invalid dependency `{}` which is missing a lib target",
622 pkg_id,
623 dep.name_in_toml(),
624 ))?;
625 }
626 }
627 }
628 Ok(())
629 }
630
631 fn filter_deps<'a>(
632 pkg_id: PackageId,
633 resolve: &'a Resolve,
634 has_dev_units: HasDevUnits,
635 requested_kinds: &'a [CompileKind],
636 target_data: &'a RustcTargetData<'_>,
637 force_all_targets: ForceAllTargets,
638 ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a {
639 resolve
640 .deps(pkg_id)
641 .filter(move |&(_id, deps)| {
642 deps.iter().any(|dep| {
643 if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No {
644 return false;
645 }
646 if force_all_targets == ForceAllTargets::No {
647 let activated = requested_kinds
648 .iter()
649 .chain(Some(&CompileKind::Host))
650 .any(|kind| target_data.dep_platform_activated(dep, *kind));
651 if !activated {
652 return false;
653 }
654 }
655 true
656 })
657 })
658 .into_iter()
659 }
660
661 pub fn sources(&self) -> Ref<'_, SourceMap<'gctx>> {
662 self.sources.borrow()
663 }
664
665 pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'gctx>> {
666 self.sources.borrow_mut()
667 }
668
669 pub fn add_set(&mut self, set: PackageSet<'gctx>) {
671 assert!(!self.downloading.get());
672 assert!(!set.downloading.get());
673 for (pkg_id, p_cell) in set.packages {
674 self.packages.entry(pkg_id).or_insert(p_cell);
675 }
676 let mut sources = self.sources.borrow_mut();
677 let other_sources = set.sources.into_inner();
678 sources.add_source_map(other_sources);
679 }
680}
681
682impl<'a, 'gctx> Downloads<'a, 'gctx> {
683 #[tracing::instrument(skip_all)]
689 pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
690 self.start_inner(id)
691 .with_context(|| format!("failed to download `{}`", id))
692 }
693
694 fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> {
695 let slot = self
698 .set
699 .packages
700 .get(&id)
701 .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?;
702 if let Some(pkg) = slot.borrow() {
703 return Ok(Some(pkg));
704 }
705
706 let mut sources = self.set.sources.borrow_mut();
710 let source = sources
711 .get_mut(id.source_id())
712 .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?;
713 let pkg = source
714 .download(id)
715 .context("unable to get packages from source")?;
716 let (url, descriptor, authorization) = match pkg {
717 MaybePackage::Ready(pkg) => {
718 debug!("{} doesn't need a download", id);
719 assert!(slot.fill(pkg).is_ok());
720 return Ok(Some(slot.borrow().unwrap()));
721 }
722 MaybePackage::Download {
723 url,
724 descriptor,
725 authorization,
726 } => (url, descriptor, authorization),
727 };
728
729 let token = self.next;
734 self.next += 1;
735 debug!(target: "network", "downloading {} as {}", id, token);
736 assert!(self.pending_ids.insert(id));
737
738 let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?;
739 handle.get(true)?;
740 handle.url(&url)?;
741 handle.follow_location(true)?; if let Some(authorization) = authorization {
745 let mut headers = curl::easy::List::new();
746 headers.append(&format!("Authorization: {}", authorization))?;
747 handle.http_headers(headers)?;
748 }
749
750 crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle);
752
753 handle.write_function(move |buf| {
754 debug!(target: "network", "{} - {} bytes of data", token, buf.len());
755 tls::with(|downloads| {
756 if let Some(downloads) = downloads {
757 downloads.pending[&token]
758 .0
759 .data
760 .borrow_mut()
761 .extend_from_slice(buf);
762 }
763 });
764 Ok(buf.len())
765 })?;
766 handle.header_function(move |data| {
767 tls::with(|downloads| {
768 if let Some(downloads) = downloads {
769 let h = String::from_utf8_lossy(data).trim().to_string();
772 downloads.pending[&token].0.headers.borrow_mut().push(h);
773 }
774 });
775 true
776 })?;
777
778 handle.progress(true)?;
779 handle.progress_function(move |dl_total, dl_cur, _, _| {
780 tls::with(|downloads| match downloads {
781 Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
782 None => false,
783 })
784 })?;
785
786 if self.downloads_finished == 0
790 && self.pending.is_empty()
791 && !self.progress.borrow().as_ref().unwrap().is_enabled()
792 {
793 self.set.gctx.shell().status("Downloading", "crates ...")?;
794 }
795
796 let dl = Download {
797 token,
798 data: RefCell::new(Vec::new()),
799 headers: RefCell::new(Vec::new()),
800 id,
801 url,
802 descriptor,
803 total: Cell::new(0),
804 current: Cell::new(0),
805 start: Instant::now(),
806 timed_out: Cell::new(None),
807 retry: Retry::new(self.set.gctx)?,
808 };
809 self.enqueue(dl, handle)?;
810 self.tick(WhyTick::DownloadStarted)?;
811
812 Ok(None)
813 }
814
815 pub fn remaining(&self) -> usize {
817 self.pending.len() + self.sleeping.len()
818 }
819
820 #[tracing::instrument(skip_all)]
829 pub fn wait(&mut self) -> CargoResult<&'a Package> {
830 let (dl, data) = loop {
831 assert_eq!(self.pending.len(), self.pending_ids.len());
832 let (token, result) = self.wait_for_curl()?;
833 debug!(target: "network", "{} finished with {:?}", token, result);
834
835 let (mut dl, handle) = self
836 .pending
837 .remove(&token)
838 .expect("got a token for a non-in-progress transfer");
839 let data = mem::take(&mut *dl.data.borrow_mut());
840 let headers = mem::take(&mut *dl.headers.borrow_mut());
841 let mut handle = self.set.multi.remove(handle)?;
842 self.pending_ids.remove(&dl.id);
843
844 let ret = {
848 let timed_out = &dl.timed_out;
849 let url = &dl.url;
850 dl.retry.r#try(|| {
851 if let Err(e) = result {
852 if !e.is_aborted_by_callback() {
860 return Err(e.into());
861 }
862
863 return Err(match timed_out.replace(None) {
864 Some(msg) => {
865 let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
866 let mut err = curl::Error::new(code);
867 err.set_extra(msg);
868 err
869 }
870 None => e,
871 }
872 .into());
873 }
874
875 let code = handle.response_code()?;
876 if code != 200 && code != 0 {
877 return Err(HttpNotSuccessful::new_from_handle(
878 &mut handle,
879 &url,
880 data,
881 headers,
882 )
883 .into());
884 }
885 Ok(data)
886 })
887 };
888 match ret {
889 RetryResult::Success(data) => break (dl, data),
890 RetryResult::Err(e) => {
891 return Err(e.context(format!("failed to download from `{}`", dl.url)))
892 }
893 RetryResult::Retry(sleep) => {
894 debug!(target: "network", "download retry {} for {sleep}ms", dl.url);
895 self.sleeping.push(sleep, (dl, handle));
896 }
897 }
898 };
899
900 self.progress.borrow_mut().as_mut().unwrap().clear();
904 self.set.gctx.shell().status("Downloaded", &dl.descriptor)?;
905
906 self.downloads_finished += 1;
907 self.downloaded_bytes += dl.total.get();
908 if dl.total.get() > self.largest.0 {
909 self.largest = (dl.total.get(), dl.id.name());
910 }
911
912 if dl.total.get() < ByteSize::kb(400).0 {
918 self.tick(WhyTick::DownloadFinished)?;
919 } else {
920 self.tick(WhyTick::Extracting(&dl.id.name()))?;
921 }
922
923 let mut sources = self.set.sources.borrow_mut();
926 let source = sources
927 .get_mut(dl.id.source_id())
928 .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
929 let start = Instant::now();
930 let pkg = source.finish_download(dl.id, data)?;
931
932 let finish_dur = start.elapsed();
937 self.updated_at.set(self.updated_at.get() + finish_dur);
938 self.next_speed_check
939 .set(self.next_speed_check.get() + finish_dur);
940
941 let slot = &self.set.packages[&dl.id];
942 assert!(slot.fill(pkg).is_ok());
943 Ok(slot.borrow().unwrap())
944 }
945
946 fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> {
947 let mut handle = self.set.multi.add(handle)?;
948 let now = Instant::now();
949 handle.set_token(dl.token)?;
950 self.updated_at.set(now);
951 self.next_speed_check.set(now + self.timeout.dur);
952 self.next_speed_check_bytes_threshold
953 .set(u64::from(self.timeout.low_speed_limit));
954 dl.timed_out.set(None);
955 dl.current.set(0);
956 dl.total.set(0);
957 self.pending.insert(dl.token, (dl, handle));
958 Ok(())
959 }
960
961 fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
964 loop {
980 self.add_sleepers()?;
981 let n = tls::set(self, || {
982 self.set
983 .multi
984 .perform()
985 .context("failed to perform http requests")
986 })?;
987 debug!(target: "network", "handles remaining: {}", n);
988 let results = &mut self.results;
989 let pending = &self.pending;
990 self.set.multi.messages(|msg| {
991 let token = msg.token().expect("failed to read token");
992 let handle = &pending[&token].1;
993 if let Some(result) = msg.result_for(handle) {
994 results.push((token, result));
995 } else {
996 debug!(target: "network", "message without a result (?)");
997 }
998 });
999
1000 if let Some(pair) = results.pop() {
1001 break Ok(pair);
1002 }
1003 assert_ne!(self.remaining(), 0);
1004 if self.pending.is_empty() {
1005 let delay = self.sleeping.time_to_next().unwrap();
1006 debug!(target: "network", "sleeping main thread for {delay:?}");
1007 std::thread::sleep(delay);
1008 } else {
1009 let min_timeout = Duration::new(1, 0);
1010 let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1011 let timeout = timeout.min(min_timeout);
1012 self.set
1013 .multi
1014 .wait(&mut [], timeout)
1015 .context("failed to wait on curl `Multi`")?;
1016 }
1017 }
1018 }
1019
1020 fn add_sleepers(&mut self) -> CargoResult<()> {
1021 for (dl, handle) in self.sleeping.to_retry() {
1022 self.pending_ids.insert(dl.id);
1023 self.enqueue(dl, handle)?;
1024 }
1025 Ok(())
1026 }
1027
1028 fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
1029 let dl = &self.pending[&token].0;
1030 dl.total.set(total);
1031 let now = Instant::now();
1032 if cur > dl.current.get() {
1033 let delta = cur - dl.current.get();
1034 let threshold = self.next_speed_check_bytes_threshold.get();
1035
1036 dl.current.set(cur);
1037 self.updated_at.set(now);
1038
1039 if delta >= threshold {
1040 self.next_speed_check.set(now + self.timeout.dur);
1041 self.next_speed_check_bytes_threshold
1042 .set(u64::from(self.timeout.low_speed_limit));
1043 } else {
1044 self.next_speed_check_bytes_threshold.set(threshold - delta);
1045 }
1046 }
1047 if self.tick(WhyTick::DownloadUpdate).is_err() {
1048 return false;
1049 }
1050
1051 if now > self.updated_at.get() + self.timeout.dur {
1053 self.updated_at.set(now);
1054 let msg = format!(
1055 "failed to download any data for `{}` within {}s",
1056 dl.id,
1057 self.timeout.dur.as_secs()
1058 );
1059 dl.timed_out.set(Some(msg));
1060 return false;
1061 }
1062
1063 if now >= self.next_speed_check.get() {
1068 self.next_speed_check.set(now + self.timeout.dur);
1069 assert!(self.next_speed_check_bytes_threshold.get() > 0);
1070 let msg = format!(
1071 "download of `{}` failed to transfer more \
1072 than {} bytes in {}s",
1073 dl.id,
1074 self.timeout.low_speed_limit,
1075 self.timeout.dur.as_secs()
1076 );
1077 dl.timed_out.set(Some(msg));
1078 return false;
1079 }
1080
1081 true
1082 }
1083
1084 fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> {
1085 let mut progress = self.progress.borrow_mut();
1086 let progress = progress.as_mut().unwrap();
1087
1088 if let WhyTick::DownloadUpdate = why {
1089 if !progress.update_allowed() {
1090 return Ok(());
1091 }
1092 }
1093 let pending = self.remaining();
1094 let mut msg = if pending == 1 {
1095 format!("{} crate", pending)
1096 } else {
1097 format!("{} crates", pending)
1098 };
1099 match why {
1100 WhyTick::Extracting(krate) => {
1101 msg.push_str(&format!(", extracting {} ...", krate));
1102 }
1103 _ => {
1104 let mut dur = Duration::new(0, 0);
1105 let mut remaining = 0;
1106 for (dl, _) in self.pending.values() {
1107 dur += dl.start.elapsed();
1108 if dl.total.get() >= dl.current.get() {
1112 remaining += dl.total.get() - dl.current.get();
1113 }
1114 }
1115 if remaining > 0 && dur > Duration::from_millis(500) {
1116 msg.push_str(&format!(", remaining bytes: {}", ByteSize(remaining)));
1117 }
1118 }
1119 }
1120 progress.print_now(&msg)
1121 }
1122}
1123
1124#[derive(Copy, Clone)]
1125enum WhyTick<'a> {
1126 DownloadStarted,
1127 DownloadUpdate,
1128 DownloadFinished,
1129 Extracting(&'a str),
1130}
1131
1132impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> {
1133 fn drop(&mut self) {
1134 self.set.downloading.set(false);
1135 let progress = self.progress.get_mut().take().unwrap();
1136 if !progress.is_enabled() {
1139 return;
1140 }
1141 if self.downloads_finished == 0 {
1143 return;
1144 }
1145 if !self.success {
1147 return;
1148 }
1149 let crate_string = if self.downloads_finished == 1 {
1151 "crate"
1152 } else {
1153 "crates"
1154 };
1155 let mut status = format!(
1156 "{} {} ({}) in {}",
1157 self.downloads_finished,
1158 crate_string,
1159 ByteSize(self.downloaded_bytes),
1160 util::elapsed(self.start.elapsed())
1161 );
1162 if self.largest.0 > ByteSize::mb(1).0 && self.downloads_finished > 1 {
1166 status.push_str(&format!(
1167 " (largest was `{}` at {})",
1168 self.largest.1,
1169 ByteSize(self.largest.0),
1170 ));
1171 }
1172 drop(progress);
1174 drop(self.set.gctx.shell().status("Downloaded", status));
1175 }
1176}
1177
1178mod tls {
1179 use std::cell::Cell;
1180
1181 use super::Downloads;
1182
1183 thread_local!(static PTR: Cell<usize> = Cell::new(0));
1184
1185 pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R {
1186 let ptr = PTR.with(|p| p.get());
1187 if ptr == 0 {
1188 f(None)
1189 } else {
1190 unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) }
1191 }
1192 }
1193
1194 pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R {
1195 struct Reset<'a, T: Copy>(&'a Cell<T>, T);
1196
1197 impl<'a, T: Copy> Drop for Reset<'a, T> {
1198 fn drop(&mut self) {
1199 self.0.set(self.1);
1200 }
1201 }
1202
1203 PTR.with(|p| {
1204 let _reset = Reset(p, p.get());
1205 p.set(dl as *const Downloads<'_, '_> as usize);
1206 f()
1207 })
1208 }
1209}