1use crate::core::global_cache_tracker;
4use crate::core::{PackageId, SourceId};
5use crate::sources::registry::download;
6use crate::sources::registry::MaybeLock;
7use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
8use crate::util::cache_lock::CacheLockMode;
9use crate::util::errors::{CargoResult, HttpNotSuccessful};
10use crate::util::interning::InternedString;
11use crate::util::network::http::http_handle;
12use crate::util::network::retry::{Retry, RetryResult};
13use crate::util::network::sleep::SleepTracker;
14use crate::util::{auth, Filesystem, GlobalContext, IntoUrl, Progress, ProgressStyle};
15use anyhow::Context as _;
16use cargo_credential::Operation;
17use cargo_util::paths;
18use curl::easy::{Easy, List};
19use curl::multi::{EasyHandle, Multi};
20use std::cell::RefCell;
21use std::collections::{HashMap, HashSet};
22use std::fs::{self, File};
23use std::io::ErrorKind;
24use std::path::{Path, PathBuf};
25use std::str;
26use std::task::{ready, Poll};
27use std::time::Duration;
28use tracing::{debug, trace};
29use url::Url;
30
31const ETAG: &'static str = "etag";
33const LAST_MODIFIED: &'static str = "last-modified";
34const WWW_AUTHENTICATE: &'static str = "www-authenticate";
35const IF_NONE_MATCH: &'static str = "if-none-match";
36const IF_MODIFIED_SINCE: &'static str = "if-modified-since";
37
38const UNKNOWN: &'static str = "Unknown";
39
40pub struct HttpRegistry<'gctx> {
57 name: InternedString,
60 index_path: Filesystem,
66 cache_path: Filesystem,
68 source_id: SourceId,
70 gctx: &'gctx GlobalContext,
71
72 url: Url,
74
75 multi: Multi,
77
78 requested_update: bool,
82
83 downloads: Downloads<'gctx>,
85
86 multiplexing: bool,
88
89 fresh: HashSet<PathBuf>,
93
94 fetch_started: bool,
96
97 registry_config: Option<RegistryConfig>,
99
100 auth_required: bool,
102
103 login_url: Option<Url>,
105
106 auth_error_headers: Vec<String>,
108
109 quiet: bool,
111}
112
113struct Downloads<'gctx> {
115 pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
119 pending_paths: HashSet<PathBuf>,
122 sleeping: SleepTracker<(Download<'gctx>, Easy)>,
124 results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
126 next: usize,
128 progress: RefCell<Option<Progress<'gctx>>>,
130 downloads_finished: usize,
132 blocking_calls: usize,
135}
136
137struct Download<'gctx> {
139 token: usize,
142
143 path: PathBuf,
145
146 data: RefCell<Vec<u8>>,
148
149 header_map: RefCell<Headers>,
151
152 retry: Retry<'gctx>,
154}
155
156#[derive(Default)]
158struct Headers {
159 last_modified: Option<String>,
160 etag: Option<String>,
161 www_authenticate: Vec<String>,
162 all: Vec<String>,
164}
165
166enum StatusCode {
168 Success,
169 NotModified,
170 NotFound,
171 Unauthorized,
172}
173
174struct CompletedDownload {
179 response_code: StatusCode,
180 data: Vec<u8>,
181 header_map: Headers,
182}
183
184impl<'gctx> HttpRegistry<'gctx> {
185 pub fn new(
190 source_id: SourceId,
191 gctx: &'gctx GlobalContext,
192 name: &str,
193 ) -> CargoResult<HttpRegistry<'gctx>> {
194 let url = source_id.url().as_str();
195 if !url.ends_with('/') {
197 anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
198 }
199 assert!(source_id.is_sparse());
200 let url = url
201 .strip_prefix("sparse+")
202 .expect("sparse registry needs sparse+ prefix")
203 .into_url()
204 .expect("a url with the sparse+ stripped should still be valid");
205
206 Ok(HttpRegistry {
207 name: name.into(),
208 index_path: gctx.registry_index_path().join(name),
209 cache_path: gctx.registry_cache_path().join(name),
210 source_id,
211 gctx,
212 url,
213 multi: Multi::new(),
214 multiplexing: false,
215 downloads: Downloads {
216 next: 0,
217 pending: HashMap::new(),
218 pending_paths: HashSet::new(),
219 sleeping: SleepTracker::new(),
220 results: HashMap::new(),
221 progress: RefCell::new(Some(Progress::with_style(
222 "Fetch",
223 ProgressStyle::Indeterminate,
224 gctx,
225 ))),
226 downloads_finished: 0,
227 blocking_calls: 0,
228 },
229 fresh: HashSet::new(),
230 requested_update: false,
231 fetch_started: false,
232 registry_config: None,
233 auth_required: false,
234 login_url: None,
235 auth_error_headers: vec![],
236 quiet: false,
237 })
238 }
239
240 fn handle_http_header(buf: &[u8]) -> Option<(&str, &str)> {
242 if buf.is_empty() {
243 return None;
244 }
245 let buf = std::str::from_utf8(buf).ok()?.trim_end();
246 if buf.contains('\n') {
248 return None;
249 }
250 let (tag, value) = buf.split_once(':')?;
251 let value = value.trim();
252 Some((tag, value))
253 }
254
255 fn start_fetch(&mut self) -> CargoResult<()> {
259 if self.fetch_started {
260 return Ok(());
262 }
263 self.fetch_started = true;
264
265 self.multiplexing = self.gctx.http_config()?.multiplexing.unwrap_or(true);
268
269 self.multi
270 .pipelining(false, self.multiplexing)
271 .context("failed to enable multiplexing/pipelining in curl")?;
272
273 self.multi.set_max_host_connections(2)?;
275
276 if !self.quiet {
277 self.gctx
278 .shell()
279 .status("Updating", self.source_id.display_index())?;
280 }
281
282 Ok(())
283 }
284
285 fn handle_completed_downloads(&mut self) -> CargoResult<()> {
288 assert_eq!(
289 self.downloads.pending.len(),
290 self.downloads.pending_paths.len()
291 );
292
293 let results = {
295 let mut results = Vec::new();
296 let pending = &mut self.downloads.pending;
297 self.multi.messages(|msg| {
298 let token = msg.token().expect("failed to read token");
299 let (_, handle) = &pending[&token];
300 if let Some(result) = msg.result_for(handle) {
301 results.push((token, result));
302 };
303 });
304 results
305 };
306 for (token, result) in results {
307 let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
308 let was_present = self.downloads.pending_paths.remove(&download.path);
309 assert!(
310 was_present,
311 "expected pending_paths to contain {:?}",
312 download.path
313 );
314 let mut handle = self.multi.remove(handle)?;
315 let data = download.data.take();
316 let url = self.full_url(&download.path);
317 let result = match download.retry.r#try(|| {
318 result.with_context(|| format!("failed to download from `{}`", url))?;
319 let code = handle.response_code()?;
320 let code = match code {
322 200 => StatusCode::Success,
323 304 => StatusCode::NotModified,
324 401 => StatusCode::Unauthorized,
325 404 | 410 | 451 => StatusCode::NotFound,
326 _ => {
327 return Err(HttpNotSuccessful::new_from_handle(
328 &mut handle,
329 &url,
330 data,
331 download.header_map.take().all,
332 )
333 .into());
334 }
335 };
336 Ok((data, code))
337 }) {
338 RetryResult::Success((data, code)) => Ok(CompletedDownload {
339 response_code: code,
340 data,
341 header_map: download.header_map.take(),
342 }),
343 RetryResult::Err(e) => Err(e),
344 RetryResult::Retry(sleep) => {
345 debug!(target: "network", "download retry {:?} for {sleep}ms", download.path);
346 self.downloads.sleeping.push(sleep, (download, handle));
347 continue;
348 }
349 };
350
351 self.downloads.results.insert(download.path, result);
352 self.downloads.downloads_finished += 1;
353 }
354
355 self.downloads.tick()?;
356
357 Ok(())
358 }
359
360 fn full_url(&self, path: &Path) -> String {
362 format!("{}{}", self.url, path.display())
364 }
365
366 fn is_fresh(&self, path: &Path) -> bool {
370 if !self.requested_update {
371 trace!(
372 "using local {} as user did not request update",
373 path.display()
374 );
375 true
376 } else if self.gctx.cli_unstable().no_index_update {
377 trace!("using local {} in no_index_update mode", path.display());
378 true
379 } else if self.gctx.offline() {
380 trace!("using local {} in offline mode", path.display());
381 true
382 } else if self.fresh.contains(path) {
383 trace!("using local {} as it was already fetched", path.display());
384 true
385 } else {
386 debug!("checking freshness of {}", path.display());
387 false
388 }
389 }
390
391 fn config_cached(&mut self) -> CargoResult<Option<&RegistryConfig>> {
393 if self.registry_config.is_some() {
394 return Ok(self.registry_config.as_ref());
395 }
396 let config_json_path = self
397 .assert_index_locked(&self.index_path)
398 .join(RegistryConfig::NAME);
399 match fs::read(&config_json_path) {
400 Ok(raw_data) => match serde_json::from_slice(&raw_data) {
401 Ok(json) => {
402 self.registry_config = Some(json);
403 }
404 Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
405 },
406 Err(e) => {
407 if e.kind() != ErrorKind::NotFound {
408 tracing::debug!("failed to read config.json cache: {}", e)
409 }
410 }
411 }
412 Ok(self.registry_config.as_ref())
413 }
414
415 fn config(&mut self) -> Poll<CargoResult<&RegistryConfig>> {
417 debug!("loading config");
418 let index_path = self.assert_index_locked(&self.index_path);
419 let config_json_path = index_path.join(RegistryConfig::NAME);
420 if self.is_fresh(Path::new(RegistryConfig::NAME)) && self.config_cached()?.is_some() {
421 return Poll::Ready(Ok(self.registry_config.as_ref().unwrap()));
422 }
423
424 match ready!(self.load(Path::new(""), Path::new(RegistryConfig::NAME), None)?) {
425 LoadResponse::Data {
426 raw_data,
427 index_version: _,
428 } => {
429 trace!("config loaded");
430 self.registry_config = Some(serde_json::from_slice(&raw_data)?);
431 if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
432 if let Err(e) = fs::write(&config_json_path, &raw_data) {
433 tracing::debug!("failed to write config.json cache: {}", e);
434 }
435 }
436 Poll::Ready(Ok(self.registry_config.as_ref().unwrap()))
437 }
438 LoadResponse::NotFound => {
439 Poll::Ready(Err(anyhow::anyhow!("config.json not found in registry")))
440 }
441 LoadResponse::CacheValid => Poll::Ready(Err(crate::util::internal(
442 "config.json is never stored in the index cache",
443 ))),
444 }
445 }
446
447 fn add_sleepers(&mut self) -> CargoResult<()> {
449 for (dl, handle) in self.downloads.sleeping.to_retry() {
450 let mut handle = self.multi.add(handle)?;
451 handle.set_token(dl.token)?;
452 let is_new = self.downloads.pending_paths.insert(dl.path.to_path_buf());
453 assert!(is_new, "path queued for download more than once");
454 let previous = self.downloads.pending.insert(dl.token, (dl, handle));
455 assert!(previous.is_none(), "dl token queued more than once");
456 }
457 Ok(())
458 }
459}
460
461impl<'gctx> RegistryData for HttpRegistry<'gctx> {
462 fn prepare(&self) -> CargoResult<()> {
463 self.gctx
464 .deferred_global_last_use()?
465 .mark_registry_index_used(global_cache_tracker::RegistryIndex {
466 encoded_registry_name: self.name,
467 });
468 Ok(())
469 }
470
471 fn index_path(&self) -> &Filesystem {
472 &self.index_path
473 }
474
475 fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
476 self.gctx
477 .assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
478 }
479
480 fn is_updated(&self) -> bool {
481 self.requested_update
482 }
483
484 fn load(
485 &mut self,
486 _root: &Path,
487 path: &Path,
488 index_version: Option<&str>,
489 ) -> Poll<CargoResult<LoadResponse>> {
490 trace!("load: {}", path.display());
491 if let Some(_token) = self.downloads.pending_paths.get(path) {
492 debug!("dependency is still pending: {}", path.display());
493 return Poll::Pending;
494 }
495
496 if let Some(index_version) = index_version {
497 trace!(
498 "local cache of {} is available at version `{}`",
499 path.display(),
500 index_version
501 );
502 if self.is_fresh(path) {
503 return Poll::Ready(Ok(LoadResponse::CacheValid));
504 }
505 } else if self.fresh.contains(path) {
506 debug!(
508 "cache did not contain previously downloaded file {}",
509 path.display()
510 );
511 return Poll::Ready(Ok(LoadResponse::NotFound));
512 }
513
514 if self.gctx.offline() || self.gctx.cli_unstable().no_index_update {
515 return Poll::Ready(Ok(LoadResponse::NotFound));
519 }
520
521 if let Some(result) = self.downloads.results.remove(path) {
522 let result =
523 result.with_context(|| format!("download of {} failed", path.display()))?;
524
525 let is_new = self.fresh.insert(path.to_path_buf());
526 assert!(
527 is_new,
528 "downloaded the index file `{}` twice",
529 path.display()
530 );
531
532 match result.response_code {
535 StatusCode::Success => {
536 let response_index_version = if let Some(etag) = result.header_map.etag {
537 format!("{}: {}", ETAG, etag)
538 } else if let Some(lm) = result.header_map.last_modified {
539 format!("{}: {}", LAST_MODIFIED, lm)
540 } else {
541 UNKNOWN.to_string()
542 };
543 trace!("index file version: {}", response_index_version);
544 return Poll::Ready(Ok(LoadResponse::Data {
545 raw_data: result.data,
546 index_version: Some(response_index_version),
547 }));
548 }
549 StatusCode::NotModified => {
550 if index_version.is_none() {
552 return Poll::Ready(Err(anyhow::anyhow!(
553 "server said not modified (HTTP 304) when no local cache exists"
554 )));
555 }
556 return Poll::Ready(Ok(LoadResponse::CacheValid));
557 }
558 StatusCode::NotFound => {
559 return Poll::Ready(Ok(LoadResponse::NotFound));
561 }
562 StatusCode::Unauthorized
563 if !self.auth_required && path == Path::new(RegistryConfig::NAME) =>
564 {
565 debug!(target: "network", "re-attempting request for config.json with authorization included.");
566 self.fresh.remove(path);
567 self.auth_required = true;
568
569 for header in &result.header_map.www_authenticate {
571 for challenge in http_auth::ChallengeParser::new(header) {
572 match challenge {
573 Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
574 for (param, value) in challenge.params {
576 if param.eq_ignore_ascii_case("login_url") {
577 self.login_url = Some(value.to_unescaped().into_url()?);
578 }
579 }
580 }
581 Ok(challenge) => {
582 debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
583 }
584 Err(e) => {
585 debug!(target: "network", "failed to parse challenge: {}", e)
586 }
587 }
588 }
589 }
590 self.auth_error_headers = result.header_map.all;
591 }
592 StatusCode::Unauthorized => {
593 let err = Err(HttpNotSuccessful {
594 code: 401,
595 body: result.data,
596 url: self.full_url(path),
597 ip: None,
598 headers: result.header_map.all,
599 }
600 .into());
601 if self.auth_required {
602 let auth_error = auth::AuthorizationError::new(
603 self.gctx,
604 self.source_id,
605 self.login_url.clone(),
606 auth::AuthorizationErrorReason::TokenRejected,
607 )?;
608 return Poll::Ready(err.context(auth_error));
609 } else {
610 return Poll::Ready(err);
611 }
612 }
613 }
614 }
615
616 if path != Path::new(RegistryConfig::NAME) {
617 self.auth_required = ready!(self.config()?).auth_required;
618 } else if !self.auth_required {
619 if let Some(config) = self.config_cached()? {
622 self.auth_required = config.auth_required;
623 }
624 }
625
626 self.start_fetch()?;
628
629 let mut handle = http_handle(self.gctx)?;
630 let full_url = self.full_url(path);
631 debug!(target: "network", "fetch {}", full_url);
632 handle.get(true)?;
633 handle.url(&full_url)?;
634 handle.follow_location(true)?;
635
636 crate::try_old_curl_http2_pipewait!(self.multiplexing, handle);
638
639 let mut headers = List::new();
640 headers.append("cargo-protocol: version=1")?;
643 headers.append("accept: text/plain")?;
644
645 if let Some(index_version) = index_version {
647 if let Some((key, value)) = index_version.split_once(':') {
648 match key {
649 ETAG => headers.append(&format!("{}: {}", IF_NONE_MATCH, value.trim()))?,
650 LAST_MODIFIED => {
651 headers.append(&format!("{}: {}", IF_MODIFIED_SINCE, value.trim()))?
652 }
653 _ => debug!("unexpected index version: {}", index_version),
654 }
655 }
656 }
657 if self.auth_required {
658 let authorization = auth::auth_token(
659 self.gctx,
660 &self.source_id,
661 self.login_url.as_ref(),
662 Operation::Read,
663 self.auth_error_headers.clone(),
664 true,
665 )?;
666 headers.append(&format!("Authorization: {}", authorization))?;
667 trace!(target: "network", "including authorization for {}", full_url);
668 }
669 handle.http_headers(headers)?;
670
671 let token = self.downloads.next;
675 self.downloads.next += 1;
676 debug!(target: "network", "downloading {} as {}", path.display(), token);
677 let is_new = self.downloads.pending_paths.insert(path.to_path_buf());
678 assert!(is_new, "path queued for download more than once");
679
680 handle.write_function(move |buf| {
685 trace!(target: "network", "{} - {} bytes of data", token, buf.len());
686 tls::with(|downloads| {
687 if let Some(downloads) = downloads {
688 downloads.pending[&token]
689 .0
690 .data
691 .borrow_mut()
692 .extend_from_slice(buf);
693 }
694 });
695 Ok(buf.len())
696 })?;
697
698 handle.header_function(move |buf| {
700 if let Some((tag, value)) = Self::handle_http_header(buf) {
701 tls::with(|downloads| {
702 if let Some(downloads) = downloads {
703 let mut header_map = downloads.pending[&token].0.header_map.borrow_mut();
704 header_map.all.push(format!("{tag}: {value}"));
705 match tag.to_ascii_lowercase().as_str() {
706 LAST_MODIFIED => header_map.last_modified = Some(value.to_string()),
707 ETAG => header_map.etag = Some(value.to_string()),
708 WWW_AUTHENTICATE => header_map.www_authenticate.push(value.to_string()),
709 _ => {}
710 }
711 }
712 });
713 }
714
715 true
716 })?;
717
718 let dl = Download {
719 token,
720 path: path.to_path_buf(),
721 data: RefCell::new(Vec::new()),
722 header_map: Default::default(),
723 retry: Retry::new(self.gctx)?,
724 };
725
726 let mut handle = self.multi.add(handle)?;
728 handle.set_token(token)?;
729 self.downloads.pending.insert(dl.token, (dl, handle));
730
731 Poll::Pending
732 }
733
734 fn config(&mut self) -> Poll<CargoResult<Option<RegistryConfig>>> {
735 let cfg = ready!(self.config()?).clone();
736 Poll::Ready(Ok(Some(cfg)))
737 }
738
739 fn invalidate_cache(&mut self) {
740 debug!("invalidated index cache");
744 self.fresh.clear();
745 self.requested_update = true;
746 }
747
748 fn set_quiet(&mut self, quiet: bool) {
749 self.quiet = quiet;
750 self.downloads.progress.replace(None);
751 }
752
753 fn download(&mut self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
754 let registry_config = loop {
755 match self.config()? {
756 Poll::Pending => self.block_until_ready()?,
757 Poll::Ready(cfg) => break cfg.to_owned(),
758 }
759 };
760
761 download::download(
762 &self.cache_path,
763 &self.gctx,
764 self.name.clone(),
765 pkg,
766 checksum,
767 registry_config,
768 )
769 }
770
771 fn finish_download(
772 &mut self,
773 pkg: PackageId,
774 checksum: &str,
775 data: &[u8],
776 ) -> CargoResult<File> {
777 download::finish_download(
778 &self.cache_path,
779 &self.gctx,
780 self.name.clone(),
781 pkg,
782 checksum,
783 data,
784 )
785 }
786
787 fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
788 download::is_crate_downloaded(&self.cache_path, &self.gctx, pkg)
789 }
790
791 fn block_until_ready(&mut self) -> CargoResult<()> {
792 trace!(target: "network::HttpRegistry::block_until_ready",
793 "{} transfers pending",
794 self.downloads.pending.len()
795 );
796 self.downloads.blocking_calls += 1;
797
798 loop {
799 let remaining_in_multi = tls::set(&self.downloads, || {
800 self.multi
801 .perform()
802 .context("failed to perform http requests")
803 })?;
804 trace!(target: "network", "{} transfers remaining", remaining_in_multi);
805 self.handle_completed_downloads()?;
809 if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
810 return Ok(());
811 }
812 self.add_sleepers()?;
815
816 if self.downloads.pending.is_empty() {
817 let delay = self.downloads.sleeping.time_to_next().unwrap();
818 debug!(target: "network", "sleeping main thread for {delay:?}");
819 std::thread::sleep(delay);
820 } else {
821 let timeout = self
824 .multi
825 .get_timeout()?
826 .unwrap_or_else(|| Duration::new(1, 0));
827 self.multi
828 .wait(&mut [], timeout)
829 .context("failed to wait on curl `Multi`")?;
830 }
831 }
832 }
833}
834
835impl<'gctx> Downloads<'gctx> {
836 fn tick(&self) -> CargoResult<()> {
838 let mut progress = self.progress.borrow_mut();
839 let Some(progress) = progress.as_mut() else {
840 return Ok(());
841 };
842
843 let approximate_tree_depth = 10;
855
856 progress.tick(
857 self.blocking_calls.min(approximate_tree_depth),
858 approximate_tree_depth + 1,
859 &format!(
860 " {} complete; {} pending",
861 self.downloads_finished,
862 self.pending.len() + self.sleeping.len()
863 ),
864 )
865 }
866}
867
868mod tls {
869 use super::Downloads;
870 use std::cell::Cell;
871
872 thread_local!(static PTR: Cell<usize> = Cell::new(0));
873
874 pub(super) fn with<R>(f: impl FnOnce(Option<&Downloads<'_>>) -> R) -> R {
875 let ptr = PTR.with(|p| p.get());
876 if ptr == 0 {
877 f(None)
878 } else {
879 let ptr = unsafe { &*(ptr as *const Downloads<'_>) };
881 f(Some(ptr))
882 }
883 }
884
885 pub(super) fn set<R>(dl: &Downloads<'_>, f: impl FnOnce() -> R) -> R {
886 struct Reset<'a, T: Copy>(&'a Cell<T>, T);
887
888 impl<'a, T: Copy> Drop for Reset<'a, T> {
889 fn drop(&mut self) {
890 self.0.set(self.1);
891 }
892 }
893
894 PTR.with(|p| {
895 let _reset = Reset(p, p.get());
896 p.set(dl as *const Downloads<'_> as usize);
897 f()
898 })
899 }
900}