cargo/sources/registry/
http_remote.rs

1//! Access to a HTTP-based crate registry. See [`HttpRegistry`] for details.
2
3use crate::core::global_cache_tracker;
4use crate::core::{PackageId, SourceId};
5use crate::sources::registry::MaybeLock;
6use crate::sources::registry::download;
7use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
8use crate::util::cache_lock::CacheLockMode;
9use crate::util::errors::{CargoResult, HttpNotSuccessful};
10use crate::util::interning::InternedString;
11use crate::util::network::http::http_handle;
12use crate::util::network::retry::{Retry, RetryResult};
13use crate::util::network::sleep::SleepTracker;
14use crate::util::{Filesystem, GlobalContext, IntoUrl, Progress, ProgressStyle, auth};
15use anyhow::Context as _;
16use cargo_credential::Operation;
17use cargo_util::paths;
18use curl::easy::{Easy, List};
19use curl::multi::{EasyHandle, Multi};
20use std::cell::RefCell;
21use std::collections::{HashMap, HashSet};
22use std::fs::{self, File};
23use std::io::ErrorKind;
24use std::path::{Path, PathBuf};
25use std::str;
26use std::task::{Poll, ready};
27use std::time::Duration;
28use tracing::{debug, trace};
29use url::Url;
30
31// HTTP headers
32const ETAG: &'static str = "etag";
33const LAST_MODIFIED: &'static str = "last-modified";
34const WWW_AUTHENTICATE: &'static str = "www-authenticate";
35const IF_NONE_MATCH: &'static str = "if-none-match";
36const IF_MODIFIED_SINCE: &'static str = "if-modified-since";
37
38const UNKNOWN: &'static str = "Unknown";
39
40/// A registry served by the HTTP-based registry API.
41///
42/// This type is primarily accessed through the [`RegistryData`] trait.
43///
44/// `HttpRegistry` implements the HTTP-based registry API outlined in [RFC 2789]. Read the RFC for
45/// the complete protocol, but _roughly_ the implementation loads each index file (e.g.,
46/// config.json or re/ge/regex) from an HTTP service rather than from a locally cloned git
47/// repository. The remote service can more or less be a static file server that simply serves the
48/// contents of the origin git repository.
49///
50/// Implemented naively, this leads to a significant amount of network traffic, as a lookup of any
51/// index file would need to check with the remote backend if the index file has changed. This
52/// cost is somewhat mitigated by the use of HTTP conditional fetches (`If-Modified-Since` and
53/// `If-None-Match` for `ETag`s) which can be efficiently handled by HTTP/2.
54///
55/// [RFC 2789]: https://github.com/rust-lang/rfcs/pull/2789
56pub struct HttpRegistry<'gctx> {
57    /// The name of this source, a unique string (across all sources) used as
58    /// the directory name where its cached content is stored.
59    name: InternedString,
60    /// Path to the registry index (`$CARGO_HOME/registry/index/$REG-HASH`).
61    ///
62    /// To be fair, `HttpRegistry` doesn't store the registry index it
63    /// downloads on the file system, but other cached data like registry
64    /// configuration could be stored here.
65    index_path: Filesystem,
66    /// Path to the cache of `.crate` files (`$CARGO_HOME/registry/cache/$REG-HASH`).
67    cache_path: Filesystem,
68    /// The unique identifier of this registry source.
69    source_id: SourceId,
70    gctx: &'gctx GlobalContext,
71
72    /// Store the server URL without the protocol prefix (sparse+)
73    url: Url,
74
75    /// HTTP multi-handle for asynchronous/parallel requests.
76    multi: Multi,
77
78    /// Has the client requested a cache update?
79    ///
80    /// Only if they have do we double-check the freshness of each locally-stored index file.
81    requested_update: bool,
82
83    /// State for currently pending index downloads.
84    downloads: Downloads<'gctx>,
85
86    /// Does the config say that we can use HTTP multiplexing?
87    multiplexing: bool,
88
89    /// What paths have we already fetched since the last index update?
90    ///
91    /// We do not need to double-check any of these index files since we have already done so.
92    fresh: HashSet<PathBuf>,
93
94    /// Have we started to download any index files?
95    fetch_started: bool,
96
97    /// Cached registry configuration.
98    registry_config: Option<RegistryConfig>,
99
100    /// Should we include the authorization header?
101    auth_required: bool,
102
103    /// Url to get a token for the registry.
104    login_url: Option<Url>,
105
106    /// Headers received with an HTTP 401.
107    auth_error_headers: Vec<String>,
108
109    /// Disables status messages.
110    quiet: bool,
111}
112
113/// State for currently pending index file downloads.
114struct Downloads<'gctx> {
115    /// When a download is started, it is added to this map. The key is a
116    /// "token" (see [`Download::token`]). It is removed once the download is
117    /// finished.
118    pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
119    /// Set of paths currently being downloaded.
120    /// This should stay in sync with the `pending` field.
121    pending_paths: HashSet<PathBuf>,
122    /// Downloads that have failed and are waiting to retry again later.
123    sleeping: SleepTracker<(Download<'gctx>, Easy)>,
124    /// The final result of each download.
125    results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
126    /// The next ID to use for creating a token (see [`Download::token`]).
127    next: usize,
128    /// Progress bar.
129    progress: RefCell<Option<Progress<'gctx>>>,
130    /// Number of downloads that have successfully finished.
131    downloads_finished: usize,
132    /// Number of times the caller has requested blocking. This is used for
133    /// an estimate of progress.
134    blocking_calls: usize,
135}
136
137/// Represents a single index file download, including its progress and retry.
138struct Download<'gctx> {
139    /// The token for this download, used as the key of the
140    /// [`Downloads::pending`] map and stored in [`EasyHandle`] as well.
141    token: usize,
142
143    /// The path of the package that we're downloading.
144    path: PathBuf,
145
146    /// Actual downloaded data, updated throughout the lifetime of this download.
147    data: RefCell<Vec<u8>>,
148
149    /// HTTP headers.
150    header_map: RefCell<Headers>,
151
152    /// Logic used to track retrying this download if it's a spurious failure.
153    retry: Retry<'gctx>,
154}
155
156/// HTTPS headers [`HttpRegistry`] cares about.
157#[derive(Default)]
158struct Headers {
159    last_modified: Option<String>,
160    etag: Option<String>,
161    www_authenticate: Vec<String>,
162    /// All headers, including explicit headers above.
163    all: Vec<String>,
164}
165
166/// HTTP status code [`HttpRegistry`] cares about.
167enum StatusCode {
168    Success,
169    NotModified,
170    NotFound,
171    Unauthorized,
172}
173
174/// Represents a complete [`Download`] from an HTTP request.
175///
176/// Usually it is constructed in [`HttpRegistry::handle_completed_downloads`],
177/// and then returns to the caller of [`HttpRegistry::load()`].
178struct CompletedDownload {
179    response_code: StatusCode,
180    data: Vec<u8>,
181    header_map: Headers,
182}
183
184impl<'gctx> HttpRegistry<'gctx> {
185    /// Creates a HTTP-rebased remote registry for `source_id`.
186    ///
187    /// * `name` --- Name of a path segment where `.crate` tarballs and the
188    ///   registry index are stored. Expect to be unique.
189    pub fn new(
190        source_id: SourceId,
191        gctx: &'gctx GlobalContext,
192        name: &str,
193    ) -> CargoResult<HttpRegistry<'gctx>> {
194        let url = source_id.url().as_str();
195        // Ensure the url ends with a slash so we can concatenate paths.
196        if !url.ends_with('/') {
197            anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
198        }
199        assert!(source_id.is_sparse());
200        let url = url
201            .strip_prefix("sparse+")
202            .expect("sparse registry needs sparse+ prefix")
203            .into_url()
204            .expect("a url with the sparse+ stripped should still be valid");
205
206        Ok(HttpRegistry {
207            name: name.into(),
208            index_path: gctx.registry_index_path().join(name),
209            cache_path: gctx.registry_cache_path().join(name),
210            source_id,
211            gctx,
212            url,
213            multi: Multi::new(),
214            multiplexing: false,
215            downloads: Downloads {
216                next: 0,
217                pending: HashMap::new(),
218                pending_paths: HashSet::new(),
219                sleeping: SleepTracker::new(),
220                results: HashMap::new(),
221                progress: RefCell::new(Some(Progress::with_style(
222                    "Fetch",
223                    ProgressStyle::Indeterminate,
224                    gctx,
225                ))),
226                downloads_finished: 0,
227                blocking_calls: 0,
228            },
229            fresh: HashSet::new(),
230            requested_update: false,
231            fetch_started: false,
232            registry_config: None,
233            auth_required: false,
234            login_url: None,
235            auth_error_headers: vec![],
236            quiet: false,
237        })
238    }
239
240    /// Splits HTTP `HEADER: VALUE` to a tuple.
241    fn handle_http_header(buf: &[u8]) -> Option<(&str, &str)> {
242        if buf.is_empty() {
243            return None;
244        }
245        let buf = std::str::from_utf8(buf).ok()?.trim_end();
246        // Don't let server sneak extra lines anywhere.
247        if buf.contains('\n') {
248            return None;
249        }
250        let (tag, value) = buf.split_once(':')?;
251        let value = value.trim();
252        Some((tag, value))
253    }
254
255    /// Setup the necessary works before the first fetch gets started.
256    ///
257    /// This is a no-op if called more than one time.
258    fn start_fetch(&mut self) -> CargoResult<()> {
259        if self.fetch_started {
260            // We only need to run the setup code once.
261            return Ok(());
262        }
263        self.fetch_started = true;
264
265        // We've enabled the `http2` feature of `curl` in Cargo, so treat
266        // failures here as fatal as it would indicate a build-time problem.
267        self.multiplexing = self.gctx.http_config()?.multiplexing.unwrap_or(true);
268
269        self.multi
270            .pipelining(false, self.multiplexing)
271            .context("failed to enable multiplexing/pipelining in curl")?;
272
273        // let's not flood the server with connections
274        self.multi.set_max_host_connections(2)?;
275
276        if !self.quiet {
277            self.gctx
278                .shell()
279                .status("Updating", self.source_id.display_index())?;
280        }
281
282        Ok(())
283    }
284
285    /// Checks the results inside the [`HttpRegistry::multi`] handle, and
286    /// updates relevant state in [`HttpRegistry::downloads`] accordingly.
287    fn handle_completed_downloads(&mut self) -> CargoResult<()> {
288        assert_eq!(
289            self.downloads.pending.len(),
290            self.downloads.pending_paths.len()
291        );
292
293        // Collect the results from the Multi handle.
294        let results = {
295            let mut results = Vec::new();
296            let pending = &mut self.downloads.pending;
297            self.multi.messages(|msg| {
298                let token = msg.token().expect("failed to read token");
299                let (_, handle) = &pending[&token];
300                if let Some(result) = msg.result_for(handle) {
301                    results.push((token, result));
302                };
303            });
304            results
305        };
306        for (token, result) in results {
307            let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
308            let was_present = self.downloads.pending_paths.remove(&download.path);
309            assert!(
310                was_present,
311                "expected pending_paths to contain {:?}",
312                download.path
313            );
314            let mut handle = self.multi.remove(handle)?;
315            let data = download.data.take();
316            let url = self.full_url(&download.path);
317            let result = match download.retry.r#try(|| {
318                result.with_context(|| format!("failed to download from `{}`", url))?;
319                let code = handle.response_code()?;
320                // Keep this list of expected status codes in sync with the codes handled in `load`
321                let code = match code {
322                    200 => StatusCode::Success,
323                    304 => StatusCode::NotModified,
324                    401 => StatusCode::Unauthorized,
325                    404 | 410 | 451 => StatusCode::NotFound,
326                    _ => {
327                        return Err(HttpNotSuccessful::new_from_handle(
328                            &mut handle,
329                            &url,
330                            data,
331                            download.header_map.take().all,
332                        )
333                        .into());
334                    }
335                };
336                Ok((data, code))
337            }) {
338                RetryResult::Success((data, code)) => Ok(CompletedDownload {
339                    response_code: code,
340                    data,
341                    header_map: download.header_map.take(),
342                }),
343                RetryResult::Err(e) => Err(e),
344                RetryResult::Retry(sleep) => {
345                    debug!(target: "network", "download retry {:?} for {sleep}ms", download.path);
346                    self.downloads.sleeping.push(sleep, (download, handle));
347                    continue;
348                }
349            };
350
351            self.downloads.results.insert(download.path, result);
352            self.downloads.downloads_finished += 1;
353        }
354
355        self.downloads.tick()?;
356
357        Ok(())
358    }
359
360    /// Constructs the full URL to download a index file.
361    fn full_url(&self, path: &Path) -> String {
362        // self.url always ends with a slash.
363        format!("{}{}", self.url, path.display())
364    }
365
366    /// Check if an index file of `path` is up-to-date.
367    ///
368    /// The `path` argument is the same as in [`RegistryData::load`].
369    fn is_fresh(&self, path: &Path) -> bool {
370        if !self.requested_update {
371            trace!(
372                "using local {} as user did not request update",
373                path.display()
374            );
375            true
376        } else if self.gctx.cli_unstable().no_index_update {
377            trace!("using local {} in no_index_update mode", path.display());
378            true
379        } else if !self.gctx.network_allowed() {
380            trace!("using local {} in offline mode", path.display());
381            true
382        } else if self.fresh.contains(path) {
383            trace!("using local {} as it was already fetched", path.display());
384            true
385        } else {
386            debug!("checking freshness of {}", path.display());
387            false
388        }
389    }
390
391    /// Get the cached registry configuration, if it exists.
392    fn config_cached(&mut self) -> CargoResult<Option<&RegistryConfig>> {
393        if self.registry_config.is_some() {
394            return Ok(self.registry_config.as_ref());
395        }
396        let config_json_path = self
397            .assert_index_locked(&self.index_path)
398            .join(RegistryConfig::NAME);
399        match fs::read(&config_json_path) {
400            Ok(raw_data) => match serde_json::from_slice(&raw_data) {
401                Ok(json) => {
402                    self.registry_config = Some(json);
403                }
404                Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
405            },
406            Err(e) => {
407                if e.kind() != ErrorKind::NotFound {
408                    tracing::debug!("failed to read config.json cache: {}", e)
409                }
410            }
411        }
412        Ok(self.registry_config.as_ref())
413    }
414
415    /// Get the registry configuration from either cache or remote.
416    fn config(&mut self) -> Poll<CargoResult<&RegistryConfig>> {
417        debug!("loading config");
418        let index_path = self.assert_index_locked(&self.index_path);
419        let config_json_path = index_path.join(RegistryConfig::NAME);
420        if self.is_fresh(Path::new(RegistryConfig::NAME)) && self.config_cached()?.is_some() {
421            return Poll::Ready(Ok(self.registry_config.as_ref().unwrap()));
422        }
423
424        match ready!(self.load(Path::new(""), Path::new(RegistryConfig::NAME), None)?) {
425            LoadResponse::Data {
426                raw_data,
427                index_version: _,
428            } => {
429                trace!("config loaded");
430                self.registry_config = Some(serde_json::from_slice(&raw_data)?);
431                if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
432                    if let Err(e) = fs::write(&config_json_path, &raw_data) {
433                        tracing::debug!("failed to write config.json cache: {}", e);
434                    }
435                }
436                Poll::Ready(Ok(self.registry_config.as_ref().unwrap()))
437            }
438            LoadResponse::NotFound => {
439                Poll::Ready(Err(anyhow::anyhow!("config.json not found in registry")))
440            }
441            LoadResponse::CacheValid => Poll::Ready(Err(crate::util::internal(
442                "config.json is never stored in the index cache",
443            ))),
444        }
445    }
446
447    /// Moves failed [`Download`]s that are ready to retry to the pending queue.
448    fn add_sleepers(&mut self) -> CargoResult<()> {
449        for (dl, handle) in self.downloads.sleeping.to_retry() {
450            let mut handle = self.multi.add(handle)?;
451            handle.set_token(dl.token)?;
452            let is_new = self.downloads.pending_paths.insert(dl.path.to_path_buf());
453            assert!(is_new, "path queued for download more than once");
454            let previous = self.downloads.pending.insert(dl.token, (dl, handle));
455            assert!(previous.is_none(), "dl token queued more than once");
456        }
457        Ok(())
458    }
459}
460
461impl<'gctx> RegistryData for HttpRegistry<'gctx> {
462    fn prepare(&self) -> CargoResult<()> {
463        self.gctx
464            .deferred_global_last_use()?
465            .mark_registry_index_used(global_cache_tracker::RegistryIndex {
466                encoded_registry_name: self.name,
467            });
468        Ok(())
469    }
470
471    fn index_path(&self) -> &Filesystem {
472        &self.index_path
473    }
474
475    fn cache_path(&self) -> &Filesystem {
476        &self.cache_path
477    }
478
479    fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
480        self.gctx
481            .assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
482    }
483
484    fn is_updated(&self) -> bool {
485        self.requested_update
486    }
487
488    fn load(
489        &mut self,
490        _root: &Path,
491        path: &Path,
492        index_version: Option<&str>,
493    ) -> Poll<CargoResult<LoadResponse>> {
494        trace!("load: {}", path.display());
495        if let Some(_token) = self.downloads.pending_paths.get(path) {
496            debug!("dependency is still pending: {}", path.display());
497            return Poll::Pending;
498        }
499
500        if let Some(index_version) = index_version {
501            trace!(
502                "local cache of {} is available at version `{}`",
503                path.display(),
504                index_version
505            );
506            if self.is_fresh(path) {
507                return Poll::Ready(Ok(LoadResponse::CacheValid));
508            }
509        } else if self.fresh.contains(path) {
510            // We have no cached copy of this file, and we already downloaded it.
511            debug!(
512                "cache did not contain previously downloaded file {}",
513                path.display()
514            );
515            return Poll::Ready(Ok(LoadResponse::NotFound));
516        }
517
518        if !self.gctx.network_allowed() || self.gctx.cli_unstable().no_index_update {
519            // Return NotFound in offline mode when the file doesn't exist in the cache.
520            // If this results in resolution failure, the resolver will suggest
521            // removing the --offline flag.
522            return Poll::Ready(Ok(LoadResponse::NotFound));
523        }
524
525        if let Some(result) = self.downloads.results.remove(path) {
526            let result =
527                result.with_context(|| format!("download of {} failed", path.display()))?;
528
529            let is_new = self.fresh.insert(path.to_path_buf());
530            assert!(
531                is_new,
532                "downloaded the index file `{}` twice",
533                path.display()
534            );
535
536            // The status handled here need to be kept in sync with the codes handled
537            // in `handle_completed_downloads`
538            match result.response_code {
539                StatusCode::Success => {
540                    let response_index_version = if let Some(etag) = result.header_map.etag {
541                        format!("{}: {}", ETAG, etag)
542                    } else if let Some(lm) = result.header_map.last_modified {
543                        format!("{}: {}", LAST_MODIFIED, lm)
544                    } else {
545                        UNKNOWN.to_string()
546                    };
547                    trace!("index file version: {}", response_index_version);
548                    return Poll::Ready(Ok(LoadResponse::Data {
549                        raw_data: result.data,
550                        index_version: Some(response_index_version),
551                    }));
552                }
553                StatusCode::NotModified => {
554                    // Not Modified: the data in the cache is still the latest.
555                    if index_version.is_none() {
556                        return Poll::Ready(Err(anyhow::anyhow!(
557                            "server said not modified (HTTP 304) when no local cache exists"
558                        )));
559                    }
560                    return Poll::Ready(Ok(LoadResponse::CacheValid));
561                }
562                StatusCode::NotFound => {
563                    // The crate was not found or deleted from the registry.
564                    return Poll::Ready(Ok(LoadResponse::NotFound));
565                }
566                StatusCode::Unauthorized
567                    if !self.auth_required && path == Path::new(RegistryConfig::NAME) =>
568                {
569                    debug!(target: "network", "re-attempting request for config.json with authorization included.");
570                    self.fresh.remove(path);
571                    self.auth_required = true;
572
573                    // Look for a `www-authenticate` header with the `Cargo` scheme.
574                    for header in &result.header_map.www_authenticate {
575                        for challenge in http_auth::ChallengeParser::new(header) {
576                            match challenge {
577                                Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
578                                    // Look for the `login_url` parameter.
579                                    for (param, value) in challenge.params {
580                                        if param.eq_ignore_ascii_case("login_url") {
581                                            self.login_url = Some(value.to_unescaped().into_url()?);
582                                        }
583                                    }
584                                }
585                                Ok(challenge) => {
586                                    debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
587                                }
588                                Err(e) => {
589                                    debug!(target: "network", "failed to parse challenge: {}", e)
590                                }
591                            }
592                        }
593                    }
594                    self.auth_error_headers = result.header_map.all;
595                }
596                StatusCode::Unauthorized => {
597                    let err = Err(HttpNotSuccessful {
598                        code: 401,
599                        body: result.data,
600                        url: self.full_url(path),
601                        ip: None,
602                        headers: result.header_map.all,
603                    }
604                    .into());
605                    if self.auth_required {
606                        let auth_error = auth::AuthorizationError::new(
607                            self.gctx,
608                            self.source_id,
609                            self.login_url.clone(),
610                            auth::AuthorizationErrorReason::TokenRejected,
611                        )?;
612                        return Poll::Ready(err.context(auth_error));
613                    } else {
614                        return Poll::Ready(err);
615                    }
616                }
617            }
618        }
619
620        if path != Path::new(RegistryConfig::NAME) {
621            self.auth_required = ready!(self.config()?).auth_required;
622        } else if !self.auth_required {
623            // Check if there's a cached config that says auth is required.
624            // This allows avoiding the initial unauthenticated request to probe.
625            if let Some(config) = self.config_cached()? {
626                self.auth_required = config.auth_required;
627            }
628        }
629
630        // Looks like we're going to have to do a network request.
631        self.start_fetch()?;
632
633        let mut handle = http_handle(self.gctx)?;
634        let full_url = self.full_url(path);
635        debug!(target: "network", "fetch {}", full_url);
636        handle.get(true)?;
637        handle.url(&full_url)?;
638        handle.follow_location(true)?;
639
640        // Enable HTTP/2 if possible.
641        crate::try_old_curl_http2_pipewait!(self.multiplexing, handle);
642
643        let mut headers = List::new();
644        // Include a header to identify the protocol. This allows the server to
645        // know that Cargo is attempting to use the sparse protocol.
646        headers.append("cargo-protocol: version=1")?;
647        headers.append("accept: text/plain")?;
648
649        // If we have a cached copy of the file, include IF_NONE_MATCH or IF_MODIFIED_SINCE header.
650        if let Some(index_version) = index_version {
651            if let Some((key, value)) = index_version.split_once(':') {
652                match key {
653                    ETAG => headers.append(&format!("{}: {}", IF_NONE_MATCH, value.trim()))?,
654                    LAST_MODIFIED => {
655                        headers.append(&format!("{}: {}", IF_MODIFIED_SINCE, value.trim()))?
656                    }
657                    _ => debug!("unexpected index version: {}", index_version),
658                }
659            }
660        }
661        if self.auth_required {
662            let authorization = auth::auth_token(
663                self.gctx,
664                &self.source_id,
665                self.login_url.as_ref(),
666                Operation::Read,
667                self.auth_error_headers.clone(),
668                true,
669            )?;
670            headers.append(&format!("Authorization: {}", authorization))?;
671            trace!(target: "network", "including authorization for {}", full_url);
672        }
673        handle.http_headers(headers)?;
674
675        // We're going to have a bunch of downloads all happening "at the same time".
676        // So, we need some way to track what headers/data/responses are for which request.
677        // We do that through this token. Each request (and associated response) gets one.
678        let token = self.downloads.next;
679        self.downloads.next += 1;
680        debug!(target: "network", "downloading {} as {}", path.display(), token);
681        let is_new = self.downloads.pending_paths.insert(path.to_path_buf());
682        assert!(is_new, "path queued for download more than once");
683
684        // Each write should go to self.downloads.pending[&token].data.
685        // Since the write function must be 'static, we access downloads through a thread-local.
686        // That thread-local is set up in `block_until_ready` when it calls self.multi.perform,
687        // which is what ultimately calls this method.
688        handle.write_function(move |buf| {
689            trace!(target: "network", "{} - {} bytes of data", token, buf.len());
690            tls::with(|downloads| {
691                if let Some(downloads) = downloads {
692                    downloads.pending[&token]
693                        .0
694                        .data
695                        .borrow_mut()
696                        .extend_from_slice(buf);
697                }
698            });
699            Ok(buf.len())
700        })?;
701
702        // And ditto for the header function.
703        handle.header_function(move |buf| {
704            if let Some((tag, value)) = Self::handle_http_header(buf) {
705                tls::with(|downloads| {
706                    if let Some(downloads) = downloads {
707                        let mut header_map = downloads.pending[&token].0.header_map.borrow_mut();
708                        header_map.all.push(format!("{tag}: {value}"));
709                        match tag.to_ascii_lowercase().as_str() {
710                            LAST_MODIFIED => header_map.last_modified = Some(value.to_string()),
711                            ETAG => header_map.etag = Some(value.to_string()),
712                            WWW_AUTHENTICATE => header_map.www_authenticate.push(value.to_string()),
713                            _ => {}
714                        }
715                    }
716                });
717            }
718
719            true
720        })?;
721
722        let dl = Download {
723            token,
724            path: path.to_path_buf(),
725            data: RefCell::new(Vec::new()),
726            header_map: Default::default(),
727            retry: Retry::new(self.gctx)?,
728        };
729
730        // Finally add the request we've lined up to the pool of requests that cURL manages.
731        let mut handle = self.multi.add(handle)?;
732        handle.set_token(token)?;
733        self.downloads.pending.insert(dl.token, (dl, handle));
734
735        Poll::Pending
736    }
737
738    fn config(&mut self) -> Poll<CargoResult<Option<RegistryConfig>>> {
739        let cfg = ready!(self.config()?).clone();
740        Poll::Ready(Ok(Some(cfg)))
741    }
742
743    fn invalidate_cache(&mut self) {
744        // Actually updating the index is more or less a no-op for this implementation.
745        // All it does is ensure that a subsequent load will double-check files with the
746        // server rather than rely on a locally cached copy of the index files.
747        debug!("invalidated index cache");
748        self.fresh.clear();
749        self.requested_update = true;
750    }
751
752    fn set_quiet(&mut self, quiet: bool) {
753        self.quiet = quiet;
754        self.downloads.progress.replace(None);
755    }
756
757    fn download(&mut self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
758        let registry_config = loop {
759            match self.config()? {
760                Poll::Pending => self.block_until_ready()?,
761                Poll::Ready(cfg) => break cfg.to_owned(),
762            }
763        };
764
765        download::download(
766            &self.cache_path,
767            &self.gctx,
768            self.name.clone(),
769            pkg,
770            checksum,
771            registry_config,
772        )
773    }
774
775    fn finish_download(
776        &mut self,
777        pkg: PackageId,
778        checksum: &str,
779        data: &[u8],
780    ) -> CargoResult<File> {
781        download::finish_download(
782            &self.cache_path,
783            &self.gctx,
784            self.name.clone(),
785            pkg,
786            checksum,
787            data,
788        )
789    }
790
791    fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
792        download::is_crate_downloaded(&self.cache_path, &self.gctx, pkg)
793    }
794
795    fn block_until_ready(&mut self) -> CargoResult<()> {
796        trace!(target: "network::HttpRegistry::block_until_ready",
797            "{} transfers pending",
798            self.downloads.pending.len()
799        );
800        self.downloads.blocking_calls += 1;
801
802        loop {
803            let remaining_in_multi = tls::set(&self.downloads, || {
804                self.multi
805                    .perform()
806                    .context("failed to perform http requests")
807            })?;
808            trace!(target: "network", "{} transfers remaining", remaining_in_multi);
809            // Handles transfers performed by `self.multi` above and adds to
810            // `self.downloads.results`. Failed transfers get added to
811            // `self.downloads.sleeping` for retry.
812            self.handle_completed_downloads()?;
813            if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
814                return Ok(());
815            }
816            // Handles failed transfers in `self.downloads.sleeping` and
817            // re-adds them to `self.multi`.
818            self.add_sleepers()?;
819
820            if self.downloads.pending.is_empty() {
821                let delay = self.downloads.sleeping.time_to_next().unwrap();
822                debug!(target: "network", "sleeping main thread for {delay:?}");
823                std::thread::sleep(delay);
824            } else {
825                // We have no more replies to provide the caller with,
826                // so we need to wait until cURL has something new for us.
827                let timeout = self
828                    .multi
829                    .get_timeout()?
830                    .unwrap_or_else(|| Duration::new(1, 0));
831                self.multi
832                    .wait(&mut [], timeout)
833                    .context("failed to wait on curl `Multi`")?;
834            }
835        }
836    }
837}
838
839impl<'gctx> Downloads<'gctx> {
840    /// Updates the state of the progress bar for downloads.
841    fn tick(&self) -> CargoResult<()> {
842        let mut progress = self.progress.borrow_mut();
843        let Some(progress) = progress.as_mut() else {
844            return Ok(());
845        };
846
847        // Since the sparse protocol discovers dependencies as it goes,
848        // it's not possible to get an accurate progress indication.
849        //
850        // As an approximation, we assume that the depth of the dependency graph
851        // is fixed, and base the progress on how many times the caller has asked
852        // for blocking. If there are actually additional dependencies, the progress
853        // bar will get stuck. If there are fewer dependencies, it will disappear
854        // early. It will never go backwards.
855        //
856        // The status text also contains the number of completed & pending requests, which
857        // gives an better indication of forward progress.
858        let approximate_tree_depth = 10;
859
860        progress.tick(
861            self.blocking_calls.min(approximate_tree_depth),
862            approximate_tree_depth + 1,
863            &format!(
864                " {} complete; {} pending",
865                self.downloads_finished,
866                self.pending.len() + self.sleeping.len()
867            ),
868        )
869    }
870}
871
872mod tls {
873    use super::Downloads;
874    use std::cell::Cell;
875
876    thread_local!(static PTR: Cell<usize> = const { Cell::new(0) });
877
878    pub(super) fn with<R>(f: impl FnOnce(Option<&Downloads<'_>>) -> R) -> R {
879        let ptr = PTR.with(|p| p.get());
880        if ptr == 0 {
881            f(None)
882        } else {
883            // Safety: * `ptr` is only set by `set` below which ensures the type is correct.
884            let ptr = unsafe { &*(ptr as *const Downloads<'_>) };
885            f(Some(ptr))
886        }
887    }
888
889    pub(super) fn set<R>(dl: &Downloads<'_>, f: impl FnOnce() -> R) -> R {
890        struct Reset<'a, T: Copy>(&'a Cell<T>, T);
891
892        impl<'a, T: Copy> Drop for Reset<'a, T> {
893            fn drop(&mut self) {
894                self.0.set(self.1);
895            }
896        }
897
898        PTR.with(|p| {
899            let _reset = Reset(p, p.get());
900            p.set(dl as *const Downloads<'_> as usize);
901            f()
902        })
903    }
904}