cargo/sources/registry/
http_remote.rs

1//! Access to a HTTP-based crate registry. See [`HttpRegistry`] for details.
2
3use crate::core::global_cache_tracker;
4use crate::core::{PackageId, SourceId};
5use crate::sources::registry::download;
6use crate::sources::registry::MaybeLock;
7use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
8use crate::util::cache_lock::CacheLockMode;
9use crate::util::errors::{CargoResult, HttpNotSuccessful};
10use crate::util::interning::InternedString;
11use crate::util::network::http::http_handle;
12use crate::util::network::retry::{Retry, RetryResult};
13use crate::util::network::sleep::SleepTracker;
14use crate::util::{auth, Filesystem, GlobalContext, IntoUrl, Progress, ProgressStyle};
15use anyhow::Context as _;
16use cargo_credential::Operation;
17use cargo_util::paths;
18use curl::easy::{Easy, List};
19use curl::multi::{EasyHandle, Multi};
20use std::cell::RefCell;
21use std::collections::{HashMap, HashSet};
22use std::fs::{self, File};
23use std::io::ErrorKind;
24use std::path::{Path, PathBuf};
25use std::str;
26use std::task::{ready, Poll};
27use std::time::Duration;
28use tracing::{debug, trace};
29use url::Url;
30
31// HTTP headers
32const ETAG: &'static str = "etag";
33const LAST_MODIFIED: &'static str = "last-modified";
34const WWW_AUTHENTICATE: &'static str = "www-authenticate";
35const IF_NONE_MATCH: &'static str = "if-none-match";
36const IF_MODIFIED_SINCE: &'static str = "if-modified-since";
37
38const UNKNOWN: &'static str = "Unknown";
39
40/// A registry served by the HTTP-based registry API.
41///
42/// This type is primarily accessed through the [`RegistryData`] trait.
43///
44/// `HttpRegistry` implements the HTTP-based registry API outlined in [RFC 2789]. Read the RFC for
45/// the complete protocol, but _roughly_ the implementation loads each index file (e.g.,
46/// config.json or re/ge/regex) from an HTTP service rather than from a locally cloned git
47/// repository. The remote service can more or less be a static file server that simply serves the
48/// contents of the origin git repository.
49///
50/// Implemented naively, this leads to a significant amount of network traffic, as a lookup of any
51/// index file would need to check with the remote backend if the index file has changed. This
52/// cost is somewhat mitigated by the use of HTTP conditional fetches (`If-Modified-Since` and
53/// `If-None-Match` for `ETag`s) which can be efficiently handled by HTTP/2.
54///
55/// [RFC 2789]: https://github.com/rust-lang/rfcs/pull/2789
56pub struct HttpRegistry<'gctx> {
57    /// The name of this source, a unique string (across all sources) used as
58    /// the directory name where its cached content is stored.
59    name: InternedString,
60    /// Path to the registry index (`$CARGO_HOME/registry/index/$REG-HASH`).
61    ///
62    /// To be fair, `HttpRegistry` doesn't store the registry index it
63    /// downloads on the file system, but other cached data like registry
64    /// configuration could be stored here.
65    index_path: Filesystem,
66    /// Path to the cache of `.crate` files (`$CARGO_HOME/registry/cache/$REG-HASH`).
67    cache_path: Filesystem,
68    /// The unique identifier of this registry source.
69    source_id: SourceId,
70    gctx: &'gctx GlobalContext,
71
72    /// Store the server URL without the protocol prefix (sparse+)
73    url: Url,
74
75    /// HTTP multi-handle for asynchronous/parallel requests.
76    multi: Multi,
77
78    /// Has the client requested a cache update?
79    ///
80    /// Only if they have do we double-check the freshness of each locally-stored index file.
81    requested_update: bool,
82
83    /// State for currently pending index downloads.
84    downloads: Downloads<'gctx>,
85
86    /// Does the config say that we can use HTTP multiplexing?
87    multiplexing: bool,
88
89    /// What paths have we already fetched since the last index update?
90    ///
91    /// We do not need to double-check any of these index files since we have already done so.
92    fresh: HashSet<PathBuf>,
93
94    /// Have we started to download any index files?
95    fetch_started: bool,
96
97    /// Cached registry configuration.
98    registry_config: Option<RegistryConfig>,
99
100    /// Should we include the authorization header?
101    auth_required: bool,
102
103    /// Url to get a token for the registry.
104    login_url: Option<Url>,
105
106    /// Headers received with an HTTP 401.
107    auth_error_headers: Vec<String>,
108
109    /// Disables status messages.
110    quiet: bool,
111}
112
113/// State for currently pending index file downloads.
114struct Downloads<'gctx> {
115    /// When a download is started, it is added to this map. The key is a
116    /// "token" (see [`Download::token`]). It is removed once the download is
117    /// finished.
118    pending: HashMap<usize, (Download<'gctx>, EasyHandle)>,
119    /// Set of paths currently being downloaded.
120    /// This should stay in sync with the `pending` field.
121    pending_paths: HashSet<PathBuf>,
122    /// Downloads that have failed and are waiting to retry again later.
123    sleeping: SleepTracker<(Download<'gctx>, Easy)>,
124    /// The final result of each download.
125    results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
126    /// The next ID to use for creating a token (see [`Download::token`]).
127    next: usize,
128    /// Progress bar.
129    progress: RefCell<Option<Progress<'gctx>>>,
130    /// Number of downloads that have successfully finished.
131    downloads_finished: usize,
132    /// Number of times the caller has requested blocking. This is used for
133    /// an estimate of progress.
134    blocking_calls: usize,
135}
136
137/// Represents a single index file download, including its progress and retry.
138struct Download<'gctx> {
139    /// The token for this download, used as the key of the
140    /// [`Downloads::pending`] map and stored in [`EasyHandle`] as well.
141    token: usize,
142
143    /// The path of the package that we're downloading.
144    path: PathBuf,
145
146    /// Actual downloaded data, updated throughout the lifetime of this download.
147    data: RefCell<Vec<u8>>,
148
149    /// HTTP headers.
150    header_map: RefCell<Headers>,
151
152    /// Logic used to track retrying this download if it's a spurious failure.
153    retry: Retry<'gctx>,
154}
155
156/// HTTPS headers [`HttpRegistry`] cares about.
157#[derive(Default)]
158struct Headers {
159    last_modified: Option<String>,
160    etag: Option<String>,
161    www_authenticate: Vec<String>,
162    /// All headers, including explicit headers above.
163    all: Vec<String>,
164}
165
166/// HTTP status code [`HttpRegistry`] cares about.
167enum StatusCode {
168    Success,
169    NotModified,
170    NotFound,
171    Unauthorized,
172}
173
174/// Represents a complete [`Download`] from an HTTP request.
175///
176/// Usually it is constructed in [`HttpRegistry::handle_completed_downloads`],
177/// and then returns to the caller of [`HttpRegistry::load()`].
178struct CompletedDownload {
179    response_code: StatusCode,
180    data: Vec<u8>,
181    header_map: Headers,
182}
183
184impl<'gctx> HttpRegistry<'gctx> {
185    /// Creates a HTTP-rebased remote registry for `source_id`.
186    ///
187    /// * `name` --- Name of a path segment where `.crate` tarballs and the
188    ///   registry index are stored. Expect to be unique.
189    pub fn new(
190        source_id: SourceId,
191        gctx: &'gctx GlobalContext,
192        name: &str,
193    ) -> CargoResult<HttpRegistry<'gctx>> {
194        let url = source_id.url().as_str();
195        // Ensure the url ends with a slash so we can concatenate paths.
196        if !url.ends_with('/') {
197            anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
198        }
199        assert!(source_id.is_sparse());
200        let url = url
201            .strip_prefix("sparse+")
202            .expect("sparse registry needs sparse+ prefix")
203            .into_url()
204            .expect("a url with the sparse+ stripped should still be valid");
205
206        Ok(HttpRegistry {
207            name: name.into(),
208            index_path: gctx.registry_index_path().join(name),
209            cache_path: gctx.registry_cache_path().join(name),
210            source_id,
211            gctx,
212            url,
213            multi: Multi::new(),
214            multiplexing: false,
215            downloads: Downloads {
216                next: 0,
217                pending: HashMap::new(),
218                pending_paths: HashSet::new(),
219                sleeping: SleepTracker::new(),
220                results: HashMap::new(),
221                progress: RefCell::new(Some(Progress::with_style(
222                    "Fetch",
223                    ProgressStyle::Indeterminate,
224                    gctx,
225                ))),
226                downloads_finished: 0,
227                blocking_calls: 0,
228            },
229            fresh: HashSet::new(),
230            requested_update: false,
231            fetch_started: false,
232            registry_config: None,
233            auth_required: false,
234            login_url: None,
235            auth_error_headers: vec![],
236            quiet: false,
237        })
238    }
239
240    /// Splits HTTP `HEADER: VALUE` to a tuple.
241    fn handle_http_header(buf: &[u8]) -> Option<(&str, &str)> {
242        if buf.is_empty() {
243            return None;
244        }
245        let buf = std::str::from_utf8(buf).ok()?.trim_end();
246        // Don't let server sneak extra lines anywhere.
247        if buf.contains('\n') {
248            return None;
249        }
250        let (tag, value) = buf.split_once(':')?;
251        let value = value.trim();
252        Some((tag, value))
253    }
254
255    /// Setup the necessary works before the first fetch gets started.
256    ///
257    /// This is a no-op if called more than one time.
258    fn start_fetch(&mut self) -> CargoResult<()> {
259        if self.fetch_started {
260            // We only need to run the setup code once.
261            return Ok(());
262        }
263        self.fetch_started = true;
264
265        // We've enabled the `http2` feature of `curl` in Cargo, so treat
266        // failures here as fatal as it would indicate a build-time problem.
267        self.multiplexing = self.gctx.http_config()?.multiplexing.unwrap_or(true);
268
269        self.multi
270            .pipelining(false, self.multiplexing)
271            .context("failed to enable multiplexing/pipelining in curl")?;
272
273        // let's not flood the server with connections
274        self.multi.set_max_host_connections(2)?;
275
276        if !self.quiet {
277            self.gctx
278                .shell()
279                .status("Updating", self.source_id.display_index())?;
280        }
281
282        Ok(())
283    }
284
285    /// Checks the results inside the [`HttpRegistry::multi`] handle, and
286    /// updates relevant state in [`HttpRegistry::downloads`] accordingly.
287    fn handle_completed_downloads(&mut self) -> CargoResult<()> {
288        assert_eq!(
289            self.downloads.pending.len(),
290            self.downloads.pending_paths.len()
291        );
292
293        // Collect the results from the Multi handle.
294        let results = {
295            let mut results = Vec::new();
296            let pending = &mut self.downloads.pending;
297            self.multi.messages(|msg| {
298                let token = msg.token().expect("failed to read token");
299                let (_, handle) = &pending[&token];
300                if let Some(result) = msg.result_for(handle) {
301                    results.push((token, result));
302                };
303            });
304            results
305        };
306        for (token, result) in results {
307            let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
308            let was_present = self.downloads.pending_paths.remove(&download.path);
309            assert!(
310                was_present,
311                "expected pending_paths to contain {:?}",
312                download.path
313            );
314            let mut handle = self.multi.remove(handle)?;
315            let data = download.data.take();
316            let url = self.full_url(&download.path);
317            let result = match download.retry.r#try(|| {
318                result.with_context(|| format!("failed to download from `{}`", url))?;
319                let code = handle.response_code()?;
320                // Keep this list of expected status codes in sync with the codes handled in `load`
321                let code = match code {
322                    200 => StatusCode::Success,
323                    304 => StatusCode::NotModified,
324                    401 => StatusCode::Unauthorized,
325                    404 | 410 | 451 => StatusCode::NotFound,
326                    _ => {
327                        return Err(HttpNotSuccessful::new_from_handle(
328                            &mut handle,
329                            &url,
330                            data,
331                            download.header_map.take().all,
332                        )
333                        .into());
334                    }
335                };
336                Ok((data, code))
337            }) {
338                RetryResult::Success((data, code)) => Ok(CompletedDownload {
339                    response_code: code,
340                    data,
341                    header_map: download.header_map.take(),
342                }),
343                RetryResult::Err(e) => Err(e),
344                RetryResult::Retry(sleep) => {
345                    debug!(target: "network", "download retry {:?} for {sleep}ms", download.path);
346                    self.downloads.sleeping.push(sleep, (download, handle));
347                    continue;
348                }
349            };
350
351            self.downloads.results.insert(download.path, result);
352            self.downloads.downloads_finished += 1;
353        }
354
355        self.downloads.tick()?;
356
357        Ok(())
358    }
359
360    /// Constructs the full URL to download a index file.
361    fn full_url(&self, path: &Path) -> String {
362        // self.url always ends with a slash.
363        format!("{}{}", self.url, path.display())
364    }
365
366    /// Check if an index file of `path` is up-to-date.
367    ///
368    /// The `path` argument is the same as in [`RegistryData::load`].
369    fn is_fresh(&self, path: &Path) -> bool {
370        if !self.requested_update {
371            trace!(
372                "using local {} as user did not request update",
373                path.display()
374            );
375            true
376        } else if self.gctx.cli_unstable().no_index_update {
377            trace!("using local {} in no_index_update mode", path.display());
378            true
379        } else if self.gctx.offline() {
380            trace!("using local {} in offline mode", path.display());
381            true
382        } else if self.fresh.contains(path) {
383            trace!("using local {} as it was already fetched", path.display());
384            true
385        } else {
386            debug!("checking freshness of {}", path.display());
387            false
388        }
389    }
390
391    /// Get the cached registry configuration, if it exists.
392    fn config_cached(&mut self) -> CargoResult<Option<&RegistryConfig>> {
393        if self.registry_config.is_some() {
394            return Ok(self.registry_config.as_ref());
395        }
396        let config_json_path = self
397            .assert_index_locked(&self.index_path)
398            .join(RegistryConfig::NAME);
399        match fs::read(&config_json_path) {
400            Ok(raw_data) => match serde_json::from_slice(&raw_data) {
401                Ok(json) => {
402                    self.registry_config = Some(json);
403                }
404                Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
405            },
406            Err(e) => {
407                if e.kind() != ErrorKind::NotFound {
408                    tracing::debug!("failed to read config.json cache: {}", e)
409                }
410            }
411        }
412        Ok(self.registry_config.as_ref())
413    }
414
415    /// Get the registry configuration from either cache or remote.
416    fn config(&mut self) -> Poll<CargoResult<&RegistryConfig>> {
417        debug!("loading config");
418        let index_path = self.assert_index_locked(&self.index_path);
419        let config_json_path = index_path.join(RegistryConfig::NAME);
420        if self.is_fresh(Path::new(RegistryConfig::NAME)) && self.config_cached()?.is_some() {
421            return Poll::Ready(Ok(self.registry_config.as_ref().unwrap()));
422        }
423
424        match ready!(self.load(Path::new(""), Path::new(RegistryConfig::NAME), None)?) {
425            LoadResponse::Data {
426                raw_data,
427                index_version: _,
428            } => {
429                trace!("config loaded");
430                self.registry_config = Some(serde_json::from_slice(&raw_data)?);
431                if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
432                    if let Err(e) = fs::write(&config_json_path, &raw_data) {
433                        tracing::debug!("failed to write config.json cache: {}", e);
434                    }
435                }
436                Poll::Ready(Ok(self.registry_config.as_ref().unwrap()))
437            }
438            LoadResponse::NotFound => {
439                Poll::Ready(Err(anyhow::anyhow!("config.json not found in registry")))
440            }
441            LoadResponse::CacheValid => Poll::Ready(Err(crate::util::internal(
442                "config.json is never stored in the index cache",
443            ))),
444        }
445    }
446
447    /// Moves failed [`Download`]s that are ready to retry to the pending queue.
448    fn add_sleepers(&mut self) -> CargoResult<()> {
449        for (dl, handle) in self.downloads.sleeping.to_retry() {
450            let mut handle = self.multi.add(handle)?;
451            handle.set_token(dl.token)?;
452            let is_new = self.downloads.pending_paths.insert(dl.path.to_path_buf());
453            assert!(is_new, "path queued for download more than once");
454            let previous = self.downloads.pending.insert(dl.token, (dl, handle));
455            assert!(previous.is_none(), "dl token queued more than once");
456        }
457        Ok(())
458    }
459}
460
461impl<'gctx> RegistryData for HttpRegistry<'gctx> {
462    fn prepare(&self) -> CargoResult<()> {
463        self.gctx
464            .deferred_global_last_use()?
465            .mark_registry_index_used(global_cache_tracker::RegistryIndex {
466                encoded_registry_name: self.name,
467            });
468        Ok(())
469    }
470
471    fn index_path(&self) -> &Filesystem {
472        &self.index_path
473    }
474
475    fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
476        self.gctx
477            .assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
478    }
479
480    fn is_updated(&self) -> bool {
481        self.requested_update
482    }
483
484    fn load(
485        &mut self,
486        _root: &Path,
487        path: &Path,
488        index_version: Option<&str>,
489    ) -> Poll<CargoResult<LoadResponse>> {
490        trace!("load: {}", path.display());
491        if let Some(_token) = self.downloads.pending_paths.get(path) {
492            debug!("dependency is still pending: {}", path.display());
493            return Poll::Pending;
494        }
495
496        if let Some(index_version) = index_version {
497            trace!(
498                "local cache of {} is available at version `{}`",
499                path.display(),
500                index_version
501            );
502            if self.is_fresh(path) {
503                return Poll::Ready(Ok(LoadResponse::CacheValid));
504            }
505        } else if self.fresh.contains(path) {
506            // We have no cached copy of this file, and we already downloaded it.
507            debug!(
508                "cache did not contain previously downloaded file {}",
509                path.display()
510            );
511            return Poll::Ready(Ok(LoadResponse::NotFound));
512        }
513
514        if self.gctx.offline() || self.gctx.cli_unstable().no_index_update {
515            // Return NotFound in offline mode when the file doesn't exist in the cache.
516            // If this results in resolution failure, the resolver will suggest
517            // removing the --offline flag.
518            return Poll::Ready(Ok(LoadResponse::NotFound));
519        }
520
521        if let Some(result) = self.downloads.results.remove(path) {
522            let result =
523                result.with_context(|| format!("download of {} failed", path.display()))?;
524
525            let is_new = self.fresh.insert(path.to_path_buf());
526            assert!(
527                is_new,
528                "downloaded the index file `{}` twice",
529                path.display()
530            );
531
532            // The status handled here need to be kept in sync with the codes handled
533            // in `handle_completed_downloads`
534            match result.response_code {
535                StatusCode::Success => {
536                    let response_index_version = if let Some(etag) = result.header_map.etag {
537                        format!("{}: {}", ETAG, etag)
538                    } else if let Some(lm) = result.header_map.last_modified {
539                        format!("{}: {}", LAST_MODIFIED, lm)
540                    } else {
541                        UNKNOWN.to_string()
542                    };
543                    trace!("index file version: {}", response_index_version);
544                    return Poll::Ready(Ok(LoadResponse::Data {
545                        raw_data: result.data,
546                        index_version: Some(response_index_version),
547                    }));
548                }
549                StatusCode::NotModified => {
550                    // Not Modified: the data in the cache is still the latest.
551                    if index_version.is_none() {
552                        return Poll::Ready(Err(anyhow::anyhow!(
553                            "server said not modified (HTTP 304) when no local cache exists"
554                        )));
555                    }
556                    return Poll::Ready(Ok(LoadResponse::CacheValid));
557                }
558                StatusCode::NotFound => {
559                    // The crate was not found or deleted from the registry.
560                    return Poll::Ready(Ok(LoadResponse::NotFound));
561                }
562                StatusCode::Unauthorized
563                    if !self.auth_required && path == Path::new(RegistryConfig::NAME) =>
564                {
565                    debug!(target: "network", "re-attempting request for config.json with authorization included.");
566                    self.fresh.remove(path);
567                    self.auth_required = true;
568
569                    // Look for a `www-authenticate` header with the `Cargo` scheme.
570                    for header in &result.header_map.www_authenticate {
571                        for challenge in http_auth::ChallengeParser::new(header) {
572                            match challenge {
573                                Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
574                                    // Look for the `login_url` parameter.
575                                    for (param, value) in challenge.params {
576                                        if param.eq_ignore_ascii_case("login_url") {
577                                            self.login_url = Some(value.to_unescaped().into_url()?);
578                                        }
579                                    }
580                                }
581                                Ok(challenge) => {
582                                    debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
583                                }
584                                Err(e) => {
585                                    debug!(target: "network", "failed to parse challenge: {}", e)
586                                }
587                            }
588                        }
589                    }
590                    self.auth_error_headers = result.header_map.all;
591                }
592                StatusCode::Unauthorized => {
593                    let err = Err(HttpNotSuccessful {
594                        code: 401,
595                        body: result.data,
596                        url: self.full_url(path),
597                        ip: None,
598                        headers: result.header_map.all,
599                    }
600                    .into());
601                    if self.auth_required {
602                        let auth_error = auth::AuthorizationError::new(
603                            self.gctx,
604                            self.source_id,
605                            self.login_url.clone(),
606                            auth::AuthorizationErrorReason::TokenRejected,
607                        )?;
608                        return Poll::Ready(err.context(auth_error));
609                    } else {
610                        return Poll::Ready(err);
611                    }
612                }
613            }
614        }
615
616        if path != Path::new(RegistryConfig::NAME) {
617            self.auth_required = ready!(self.config()?).auth_required;
618        } else if !self.auth_required {
619            // Check if there's a cached config that says auth is required.
620            // This allows avoiding the initial unauthenticated request to probe.
621            if let Some(config) = self.config_cached()? {
622                self.auth_required = config.auth_required;
623            }
624        }
625
626        // Looks like we're going to have to do a network request.
627        self.start_fetch()?;
628
629        let mut handle = http_handle(self.gctx)?;
630        let full_url = self.full_url(path);
631        debug!(target: "network", "fetch {}", full_url);
632        handle.get(true)?;
633        handle.url(&full_url)?;
634        handle.follow_location(true)?;
635
636        // Enable HTTP/2 if possible.
637        crate::try_old_curl_http2_pipewait!(self.multiplexing, handle);
638
639        let mut headers = List::new();
640        // Include a header to identify the protocol. This allows the server to
641        // know that Cargo is attempting to use the sparse protocol.
642        headers.append("cargo-protocol: version=1")?;
643        headers.append("accept: text/plain")?;
644
645        // If we have a cached copy of the file, include IF_NONE_MATCH or IF_MODIFIED_SINCE header.
646        if let Some(index_version) = index_version {
647            if let Some((key, value)) = index_version.split_once(':') {
648                match key {
649                    ETAG => headers.append(&format!("{}: {}", IF_NONE_MATCH, value.trim()))?,
650                    LAST_MODIFIED => {
651                        headers.append(&format!("{}: {}", IF_MODIFIED_SINCE, value.trim()))?
652                    }
653                    _ => debug!("unexpected index version: {}", index_version),
654                }
655            }
656        }
657        if self.auth_required {
658            let authorization = auth::auth_token(
659                self.gctx,
660                &self.source_id,
661                self.login_url.as_ref(),
662                Operation::Read,
663                self.auth_error_headers.clone(),
664                true,
665            )?;
666            headers.append(&format!("Authorization: {}", authorization))?;
667            trace!(target: "network", "including authorization for {}", full_url);
668        }
669        handle.http_headers(headers)?;
670
671        // We're going to have a bunch of downloads all happening "at the same time".
672        // So, we need some way to track what headers/data/responses are for which request.
673        // We do that through this token. Each request (and associated response) gets one.
674        let token = self.downloads.next;
675        self.downloads.next += 1;
676        debug!(target: "network", "downloading {} as {}", path.display(), token);
677        let is_new = self.downloads.pending_paths.insert(path.to_path_buf());
678        assert!(is_new, "path queued for download more than once");
679
680        // Each write should go to self.downloads.pending[&token].data.
681        // Since the write function must be 'static, we access downloads through a thread-local.
682        // That thread-local is set up in `block_until_ready` when it calls self.multi.perform,
683        // which is what ultimately calls this method.
684        handle.write_function(move |buf| {
685            trace!(target: "network", "{} - {} bytes of data", token, buf.len());
686            tls::with(|downloads| {
687                if let Some(downloads) = downloads {
688                    downloads.pending[&token]
689                        .0
690                        .data
691                        .borrow_mut()
692                        .extend_from_slice(buf);
693                }
694            });
695            Ok(buf.len())
696        })?;
697
698        // And ditto for the header function.
699        handle.header_function(move |buf| {
700            if let Some((tag, value)) = Self::handle_http_header(buf) {
701                tls::with(|downloads| {
702                    if let Some(downloads) = downloads {
703                        let mut header_map = downloads.pending[&token].0.header_map.borrow_mut();
704                        header_map.all.push(format!("{tag}: {value}"));
705                        match tag.to_ascii_lowercase().as_str() {
706                            LAST_MODIFIED => header_map.last_modified = Some(value.to_string()),
707                            ETAG => header_map.etag = Some(value.to_string()),
708                            WWW_AUTHENTICATE => header_map.www_authenticate.push(value.to_string()),
709                            _ => {}
710                        }
711                    }
712                });
713            }
714
715            true
716        })?;
717
718        let dl = Download {
719            token,
720            path: path.to_path_buf(),
721            data: RefCell::new(Vec::new()),
722            header_map: Default::default(),
723            retry: Retry::new(self.gctx)?,
724        };
725
726        // Finally add the request we've lined up to the pool of requests that cURL manages.
727        let mut handle = self.multi.add(handle)?;
728        handle.set_token(token)?;
729        self.downloads.pending.insert(dl.token, (dl, handle));
730
731        Poll::Pending
732    }
733
734    fn config(&mut self) -> Poll<CargoResult<Option<RegistryConfig>>> {
735        let cfg = ready!(self.config()?).clone();
736        Poll::Ready(Ok(Some(cfg)))
737    }
738
739    fn invalidate_cache(&mut self) {
740        // Actually updating the index is more or less a no-op for this implementation.
741        // All it does is ensure that a subsequent load will double-check files with the
742        // server rather than rely on a locally cached copy of the index files.
743        debug!("invalidated index cache");
744        self.fresh.clear();
745        self.requested_update = true;
746    }
747
748    fn set_quiet(&mut self, quiet: bool) {
749        self.quiet = quiet;
750        self.downloads.progress.replace(None);
751    }
752
753    fn download(&mut self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
754        let registry_config = loop {
755            match self.config()? {
756                Poll::Pending => self.block_until_ready()?,
757                Poll::Ready(cfg) => break cfg.to_owned(),
758            }
759        };
760
761        download::download(
762            &self.cache_path,
763            &self.gctx,
764            self.name.clone(),
765            pkg,
766            checksum,
767            registry_config,
768        )
769    }
770
771    fn finish_download(
772        &mut self,
773        pkg: PackageId,
774        checksum: &str,
775        data: &[u8],
776    ) -> CargoResult<File> {
777        download::finish_download(
778            &self.cache_path,
779            &self.gctx,
780            self.name.clone(),
781            pkg,
782            checksum,
783            data,
784        )
785    }
786
787    fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
788        download::is_crate_downloaded(&self.cache_path, &self.gctx, pkg)
789    }
790
791    fn block_until_ready(&mut self) -> CargoResult<()> {
792        trace!(target: "network::HttpRegistry::block_until_ready",
793            "{} transfers pending",
794            self.downloads.pending.len()
795        );
796        self.downloads.blocking_calls += 1;
797
798        loop {
799            let remaining_in_multi = tls::set(&self.downloads, || {
800                self.multi
801                    .perform()
802                    .context("failed to perform http requests")
803            })?;
804            trace!(target: "network", "{} transfers remaining", remaining_in_multi);
805            // Handles transfers performed by `self.multi` above and adds to
806            // `self.downloads.results`. Failed transfers get added to
807            // `self.downloads.sleeping` for retry.
808            self.handle_completed_downloads()?;
809            if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
810                return Ok(());
811            }
812            // Handles failed transfers in `self.downloads.sleeping` and
813            // re-adds them to `self.multi`.
814            self.add_sleepers()?;
815
816            if self.downloads.pending.is_empty() {
817                let delay = self.downloads.sleeping.time_to_next().unwrap();
818                debug!(target: "network", "sleeping main thread for {delay:?}");
819                std::thread::sleep(delay);
820            } else {
821                // We have no more replies to provide the caller with,
822                // so we need to wait until cURL has something new for us.
823                let timeout = self
824                    .multi
825                    .get_timeout()?
826                    .unwrap_or_else(|| Duration::new(1, 0));
827                self.multi
828                    .wait(&mut [], timeout)
829                    .context("failed to wait on curl `Multi`")?;
830            }
831        }
832    }
833}
834
835impl<'gctx> Downloads<'gctx> {
836    /// Updates the state of the progress bar for downloads.
837    fn tick(&self) -> CargoResult<()> {
838        let mut progress = self.progress.borrow_mut();
839        let Some(progress) = progress.as_mut() else {
840            return Ok(());
841        };
842
843        // Since the sparse protocol discovers dependencies as it goes,
844        // it's not possible to get an accurate progress indication.
845        //
846        // As an approximation, we assume that the depth of the dependency graph
847        // is fixed, and base the progress on how many times the caller has asked
848        // for blocking. If there are actually additional dependencies, the progress
849        // bar will get stuck. If there are fewer dependencies, it will disappear
850        // early. It will never go backwards.
851        //
852        // The status text also contains the number of completed & pending requests, which
853        // gives an better indication of forward progress.
854        let approximate_tree_depth = 10;
855
856        progress.tick(
857            self.blocking_calls.min(approximate_tree_depth),
858            approximate_tree_depth + 1,
859            &format!(
860                " {} complete; {} pending",
861                self.downloads_finished,
862                self.pending.len() + self.sleeping.len()
863            ),
864        )
865    }
866}
867
868mod tls {
869    use super::Downloads;
870    use std::cell::Cell;
871
872    thread_local!(static PTR: Cell<usize> = Cell::new(0));
873
874    pub(super) fn with<R>(f: impl FnOnce(Option<&Downloads<'_>>) -> R) -> R {
875        let ptr = PTR.with(|p| p.get());
876        if ptr == 0 {
877            f(None)
878        } else {
879            // Safety: * `ptr` is only set by `set` below which ensures the type is correct.
880            let ptr = unsafe { &*(ptr as *const Downloads<'_>) };
881            f(Some(ptr))
882        }
883    }
884
885    pub(super) fn set<R>(dl: &Downloads<'_>, f: impl FnOnce() -> R) -> R {
886        struct Reset<'a, T: Copy>(&'a Cell<T>, T);
887
888        impl<'a, T: Copy> Drop for Reset<'a, T> {
889            fn drop(&mut self) {
890                self.0.set(self.1);
891            }
892        }
893
894        PTR.with(|p| {
895            let _reset = Reset(p, p.get());
896            p.set(dl as *const Downloads<'_> as usize);
897            f()
898        })
899    }
900}