Skip to main content

cargo/sources/registry/
http_remote.rs

1//! Access to a HTTP-based crate registry. See [`HttpRegistry`] for details.
2
3use crate::core::PackageId;
4use crate::core::SourceId;
5use crate::core::global_cache_tracker;
6use crate::sources::registry::LoadResponse;
7use crate::sources::registry::MaybeLock;
8use crate::sources::registry::RegistryConfig;
9use crate::sources::registry::RegistryData;
10use crate::sources::registry::download;
11use crate::util::Filesystem;
12use crate::util::GlobalContext;
13use crate::util::IntoUrl;
14use crate::util::Progress;
15use crate::util::ProgressStyle;
16use crate::util::auth;
17use crate::util::cache_lock::CacheLockMode;
18use crate::util::errors::CargoResult;
19use crate::util::errors::HttpNotSuccessful;
20use crate::util::interning::InternedString;
21use crate::util::network::http_async::ResponsePartsExtensions;
22use crate::util::network::retry::Retry;
23use crate::util::network::retry::RetryResult;
24use anyhow::Context as _;
25use cargo_credential::Operation;
26use cargo_util::paths;
27use futures::lock::Mutex;
28use http::HeaderName;
29use http::HeaderValue;
30use std::cell::Cell;
31use std::cell::RefCell;
32use std::collections::HashSet;
33use std::fs;
34use std::fs::File;
35use std::io::ErrorKind;
36use std::path::Path;
37use std::str;
38use std::time::Duration;
39use tracing::debug;
40use tracing::trace;
41use tracing::warn;
42use url::Url;
43
44// HTTP headers
45const ETAG: &'static str = "etag";
46const LAST_MODIFIED: &'static str = "last-modified";
47
48const UNKNOWN: &'static str = "Unknown";
49
50/// A registry served by the HTTP-based registry API.
51///
52/// This type is primarily accessed through the [`RegistryData`] trait.
53///
54/// `HttpRegistry` implements the HTTP-based registry API outlined in [RFC 2789]. Read the RFC for
55/// the complete protocol, but _roughly_ the implementation loads each index file (e.g.,
56/// config.json or re/ge/regex) from an HTTP service rather than from a locally cloned git
57/// repository. The remote service can more or less be a static file server that simply serves the
58/// contents of the origin git repository.
59///
60/// Implemented naively, this leads to a significant amount of network traffic, as a lookup of any
61/// index file would need to check with the remote backend if the index file has changed. This
62/// cost is somewhat mitigated by the use of HTTP conditional fetches (`If-Modified-Since` and
63/// `If-None-Match` for `ETag`s) which can be efficiently handled by HTTP/2.
64///
65/// [RFC 2789]: https://github.com/rust-lang/rfcs/pull/2789
66pub struct HttpRegistry<'gctx> {
67    /// The name of this source, a unique string (across all sources) used as
68    /// the directory name where its cached content is stored.
69    name: InternedString,
70
71    /// Cached registry configuration.
72    registry_config: Mutex<Option<RegistryConfig>>,
73
74    /// Backend used for making network requests.
75    inner: HttpBackend<'gctx>,
76}
77
78impl<'gctx> HttpRegistry<'gctx> {
79    /// Creates a HTTP-rebased remote registry for `source_id`.
80    ///
81    /// * `name` --- Name of a path segment where `.crate` tarballs and the
82    ///   registry index are stored. Expect to be unique.
83    pub fn new(
84        source_id: SourceId,
85        gctx: &'gctx GlobalContext,
86        name: &str,
87    ) -> CargoResult<HttpRegistry<'gctx>> {
88        Ok(HttpRegistry {
89            name: name.into(),
90            registry_config: Mutex::new(None),
91            inner: HttpBackend::new(source_id, gctx, name)?,
92        })
93    }
94
95    fn inner(&self) -> &HttpBackend<'gctx> {
96        &self.inner
97    }
98
99    /// Get the registry configuration from either cache or remote.
100    async fn config(&self) -> CargoResult<RegistryConfig> {
101        let Some(config) = self.config_opt().await? else {
102            return Err(anyhow::anyhow!("config.json not found"));
103        };
104        Ok(config)
105    }
106
107    /// Get the registry configuration from either cache or remote.
108    /// Returns None if the config is not available.
109    async fn config_opt(&self) -> CargoResult<Option<RegistryConfig>> {
110        let mut config = self.registry_config.lock().await;
111        if let Some(config) = &*config
112            && self.inner().is_fresh(RegistryConfig::NAME)
113        {
114            Ok(Some(config.clone()))
115        } else {
116            let result = self.config_opt_inner().await?;
117            *config = result.clone();
118            Ok(result)
119        }
120    }
121
122    async fn config_opt_inner(&self) -> CargoResult<Option<RegistryConfig>> {
123        debug!("loading config");
124        let index_path = self.assert_index_locked(&self.inner().index_cache_path);
125        let config_json_path = index_path.join(RegistryConfig::NAME);
126        if self.inner().is_fresh(RegistryConfig::NAME)
127            && let Some(config) = self.config_from_filesystem()
128        {
129            return Ok(Some(config.clone()));
130        }
131
132        // Check if there's a cached config that says auth is required.
133        // This allows avoiding the initial unauthenticated request to probe.
134        if let Some(c) = self.config_from_filesystem() {
135            self.inner().auth_required.update(|v| v || c.auth_required);
136        }
137
138        let response = self
139            .inner()
140            .fetch_uncached(RegistryConfig::NAME, None)
141            .await;
142        let response = match response {
143            Err(e)
144                if !self.inner().auth_required.get()
145                    && e.downcast_ref::<HttpNotSuccessful>()
146                        .map(|e| e.code == 401)
147                        .unwrap_or_default() =>
148            {
149                self.inner().auth_required.set(true);
150                debug!(target: "network", "re-attempting request for config.json with authorization included.");
151                self.inner()
152                    .fetch_uncached(RegistryConfig::NAME, None)
153                    .await
154            }
155            resp => resp,
156        }?;
157
158        match response {
159            LoadResponse::Data {
160                raw_data,
161                index_version: _,
162            } => {
163                trace!("config loaded");
164                let config = Some(serde_json::from_slice(&raw_data)?);
165                if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
166                    if let Err(e) = fs::write(&config_json_path, &raw_data) {
167                        tracing::debug!("failed to write config.json cache: {}", e);
168                    }
169                }
170                Ok(config)
171            }
172            LoadResponse::NotFound => Ok(None),
173            LoadResponse::CacheValid => Err(crate::util::internal(
174                "config.json is never stored in the index cache",
175            )),
176        }
177    }
178
179    /// Get the cached registry configuration from the filesystem, if it exists.
180    fn config_from_filesystem(&self) -> Option<RegistryConfig> {
181        let config_json_path = self
182            .assert_index_locked(&self.inner().index_cache_path)
183            .join(RegistryConfig::NAME);
184        match fs::read(&config_json_path) {
185            Ok(raw_data) => match serde_json::from_slice(&raw_data) {
186                Ok(json) => return Some(json),
187                Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
188            },
189            Err(e) => {
190                if e.kind() != ErrorKind::NotFound {
191                    tracing::debug!("failed to read config.json cache: {}", e)
192                }
193            }
194        }
195        None
196    }
197
198    async fn sparse_fetch(
199        &self,
200        path: &str,
201        index_version: Option<&str>,
202    ) -> CargoResult<LoadResponse> {
203        if let Some(index_version) = index_version {
204            trace!("local cache of {path} is available at version `{index_version}`",);
205            if self.inner().is_fresh(&path) {
206                return Ok(LoadResponse::CacheValid);
207            }
208        } else if self.inner().fresh.borrow().contains(path) {
209            // We have no cached copy of this file, and we already downloaded it.
210            debug!("cache did not contain previously downloaded file {path}",);
211            return Ok(LoadResponse::NotFound);
212        }
213
214        // If we have a cached copy of the file, include IF_NONE_MATCH or IF_MODIFIED_SINCE header.
215        let index_version =
216            index_version
217                .and_then(|v| v.split_once(':'))
218                .and_then(|(key, value)| match key {
219                    ETAG => Some((
220                        http::header::IF_NONE_MATCH,
221                        HeaderValue::from_str(value.trim()).ok()?,
222                    )),
223                    LAST_MODIFIED => Some((
224                        http::header::IF_MODIFIED_SINCE,
225                        HeaderValue::from_str(value.trim()).ok()?,
226                    )),
227                    _ => {
228                        debug!("unexpected index version: {}", index_version.unwrap());
229                        None
230                    }
231                });
232        let index_version = index_version.as_ref().map(|(k, v)| (k, v));
233        self.inner().fetch_uncached(&path, index_version).await
234    }
235}
236
237#[async_trait::async_trait(?Send)]
238impl<'gctx> RegistryData for HttpRegistry<'gctx> {
239    fn prepare(&self) -> CargoResult<()> {
240        self.inner()
241            .gctx
242            .deferred_global_last_use()?
243            .mark_registry_index_used(global_cache_tracker::RegistryIndex {
244                encoded_registry_name: self.name,
245            });
246        Ok(())
247    }
248
249    fn index_path(&self) -> &Filesystem {
250        &self.inner().index_cache_path
251    }
252
253    fn cache_path(&self) -> &Filesystem {
254        &self.inner().crate_cache_path
255    }
256
257    fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
258        self.inner()
259            .gctx
260            .assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
261    }
262
263    fn is_updated(&self) -> bool {
264        self.inner().requested_update.get()
265    }
266
267    async fn load(
268        &self,
269        _root: &Path,
270        path: &Path,
271        index_version: Option<&str>,
272    ) -> CargoResult<LoadResponse> {
273        // Ensure the config is loaded.
274        let Some(config) = self.config_opt().await? else {
275            return Ok(LoadResponse::NotFound);
276        };
277        self.inner()
278            .auth_required
279            .update(|v| v || config.auth_required);
280
281        let path = path
282            .to_str()
283            .ok_or_else(|| anyhow::anyhow!("non UTF8 path: {}", path.display()))?;
284        self.sparse_fetch(path, index_version).await
285    }
286
287    async fn config(&self) -> CargoResult<Option<RegistryConfig>> {
288        Ok(Some(self.config().await?))
289    }
290
291    fn invalidate_cache(&self) {
292        // Actually updating the index is more or less a no-op for this implementation.
293        // All it does is ensure that a subsequent load will double-check files with the
294        // server rather than rely on a locally cached copy of the index files.
295        debug!("invalidated index cache");
296        self.inner().fresh.borrow_mut().clear();
297        self.inner().requested_update.set(true);
298    }
299
300    fn set_quiet(&mut self, quiet: bool) {
301        self.inner().quiet.set(quiet);
302        self.inner().progress.replace(None);
303    }
304
305    fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
306        let registry_config = crate::util::block_on(self.config())?;
307        download::download(
308            &self.inner().crate_cache_path,
309            &self.inner().gctx,
310            self.name.clone(),
311            pkg,
312            checksum,
313            registry_config,
314        )
315    }
316
317    fn finish_download(&self, pkg: PackageId, checksum: &str, data: &[u8]) -> CargoResult<File> {
318        download::finish_download(
319            &self.inner().crate_cache_path,
320            &self.inner().gctx,
321            self.name.clone(),
322            pkg,
323            checksum,
324            data,
325        )
326    }
327
328    fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
329        download::is_crate_downloaded(&self.inner().crate_cache_path, &self.inner().gctx, pkg)
330    }
331}
332
333struct HttpBackend<'gctx> {
334    /// Path to the registry index (`$CARGO_HOME/registry/index/$REG-HASH`).
335    index_cache_path: Filesystem,
336
337    /// Path to the cache of `.crate` files (`$CARGO_HOME/registry/cache/$REG-HASH`).
338    crate_cache_path: Filesystem,
339
340    /// The unique identifier of this registry source.
341    source_id: SourceId,
342    gctx: &'gctx GlobalContext,
343
344    /// Store the server URL without the protocol prefix (sparse+)
345    url: Url,
346
347    /// Has the client requested a cache update?
348    ///
349    /// Only if they have do we double-check the freshness of each locally-stored index file.
350    requested_update: Cell<bool>,
351
352    /// Progress bar for transfers.
353    progress: RefCell<Option<Progress<'gctx>>>,
354
355    /// Number of in-flight requests.
356    pending: Cell<usize>,
357
358    /// What paths have we already fetched since the last index update?
359    ///
360    /// We do not need to double-check any of these index files since we have already done so.
361    fresh: RefCell<HashSet<String>>,
362
363    /// Have we started to download any index files?
364    fetch_started: Cell<bool>,
365
366    /// Should we include the authorization header?
367    auth_required: Cell<bool>,
368
369    /// Url to get a token for the registry.
370    login_url: RefCell<Option<Url>>,
371
372    /// Headers received with an HTTP 401.
373    auth_error_headers: RefCell<Vec<String>>,
374
375    /// Disables status messages.
376    quiet: Cell<bool>,
377}
378
379impl<'gctx> HttpBackend<'gctx> {
380    pub fn new(
381        source_id: SourceId,
382        gctx: &'gctx GlobalContext,
383        name: &str,
384    ) -> CargoResult<HttpBackend<'gctx>> {
385        let url = source_id.url().as_str();
386        // Ensure the url ends with a slash so we can concatenate paths.
387        if !url.ends_with('/') {
388            anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
389        }
390        assert!(source_id.is_sparse());
391        let url = url
392            .strip_prefix("sparse+")
393            .expect("sparse registry needs sparse+ prefix")
394            .into_url()
395            .expect("a url with the sparse+ stripped should still be valid");
396
397        let index_cache_path = gctx.registry_index_path().join(name);
398        Ok(HttpBackend {
399            index_cache_path: index_cache_path.clone(),
400            crate_cache_path: gctx.registry_cache_path().join(name),
401            source_id,
402            gctx,
403            url,
404            progress: RefCell::new(Some(Progress::with_style(
405                "Fetch",
406                ProgressStyle::Indeterminate,
407                gctx,
408            ))),
409            fresh: RefCell::new(HashSet::new()),
410            requested_update: Cell::new(false),
411            fetch_started: Cell::new(false),
412            auth_required: Cell::new(false),
413            login_url: RefCell::new(None),
414            auth_error_headers: RefCell::new(vec![]),
415            quiet: Cell::new(false),
416            pending: Cell::new(0),
417        })
418    }
419
420    /// Constructs the full URL to download a index file.
421    fn full_url(&self, path: &str) -> String {
422        // self.url always ends with a slash.
423        format!("{}{}", self.url, path)
424    }
425
426    /// Setup the necessary works before the first fetch gets started.
427    ///
428    /// This is a no-op if called more than one time.
429    fn start_fetch(&self) -> CargoResult<()> {
430        if self.fetch_started.get() {
431            // We only need to run the setup code once.
432            return Ok(());
433        }
434        self.fetch_started.set(true);
435
436        if !self.quiet.get() {
437            self.gctx
438                .shell()
439                .status("Updating", self.source_id.display_index())?;
440        }
441
442        Ok(())
443    }
444
445    /// Are we in offline mode?
446    ///
447    /// Return NotFound in offline mode when the file doesn't exist in the cache.
448    /// If this results in resolution failure, the resolver will suggest
449    /// removing the --offline flag.
450    fn offline(&self) -> bool {
451        !self.gctx.network_allowed() || self.gctx.cli_unstable().no_index_update
452    }
453
454    /// Check if an index file of `path` is up-to-date.
455    fn is_fresh(&self, path: &str) -> bool {
456        if !self.requested_update.get() {
457            trace!("using local {path} as user did not request update",);
458            true
459        } else if self.offline() {
460            trace!("using local {path} in offline mode");
461            true
462        } else if self.fresh.borrow().contains(path) {
463            trace!("using local {path} as it was already fetched");
464            true
465        } else {
466            debug!("checking freshness of {path}");
467            false
468        }
469    }
470
471    async fn fetch_uncached(
472        &self,
473        path: &str,
474        extra_header: Option<(&HeaderName, &HeaderValue)>,
475    ) -> CargoResult<LoadResponse> {
476        if self.offline() {
477            return Ok(LoadResponse::NotFound);
478        }
479
480        if !self.fresh.borrow_mut().insert(path.to_string()) {
481            warn!("downloaded the index file `{path}` twice");
482        }
483
484        let mut r = Retry::new(self.gctx)?;
485        self.pending.update(|v| v + 1);
486        let response = loop {
487            let response = self.fetch_uncached_no_retry(path, extra_header).await;
488            match r.r#try(|| response) {
489                RetryResult::Success(result) => break Ok(result),
490                RetryResult::Err(error) => break Err(error),
491                RetryResult::Retry(delay_ms) => {
492                    futures_timer::Delay::new(Duration::from_millis(delay_ms)).await;
493                }
494            }
495        };
496        self.pending.update(|v| v - 1);
497        response
498    }
499
500    async fn fetch_uncached_no_retry(
501        &self,
502        path: &str,
503        extra_header: Option<(&HeaderName, &HeaderValue)>,
504    ) -> CargoResult<LoadResponse> {
505        trace!("load: {path}");
506        self.start_fetch()?;
507        let full_url = self.full_url(path);
508        let mut request = http::Request::get(&full_url);
509
510        // Include a header to identify the protocol. This allows the server to
511        // know that Cargo is attempting to use the sparse protocol.
512        request = request.header("cargo-protocol", "version=1");
513        request = request.header(http::header::ACCEPT, "text/plain");
514
515        if let Some((k, v)) = extra_header {
516            request = request.header(k, v);
517        }
518
519        if self.auth_required.get() {
520            let authorization = auth::auth_token(
521                self.gctx,
522                &self.source_id,
523                self.login_url.borrow().clone().as_ref(),
524                Operation::Read,
525                self.auth_error_headers.borrow().clone(),
526                true,
527            )?;
528            request = request.header(http::header::AUTHORIZATION, authorization);
529            trace!(target: "network", "including authorization for {}", full_url);
530        }
531
532        let response = self
533            .gctx
534            .http_async()?
535            .request(request.body(Vec::new())?)
536            .await
537            .with_context(|| format!("download of {path} failed"))?;
538
539        self.tick()?;
540
541        let (response, body) = response.into_parts();
542
543        match response.status {
544            http::StatusCode::OK => {
545                let response_index_version =
546                    if let Some(etag) = response.headers.get(http::header::ETAG) {
547                        format!("{}: {}", ETAG, etag.to_str().unwrap())
548                    } else if let Some(lm) = response.headers.get(http::header::LAST_MODIFIED) {
549                        format!("{}: {}", LAST_MODIFIED, lm.to_str().unwrap())
550                    } else {
551                        UNKNOWN.to_string()
552                    };
553                trace!("index file version: {}", response_index_version);
554                Ok(LoadResponse::Data {
555                    raw_data: body,
556                    index_version: Some(response_index_version),
557                })
558            }
559            http::StatusCode::NOT_MODIFIED => {
560                // Not Modified: the data in the cache is still the latest.
561                Ok(LoadResponse::CacheValid)
562            }
563            http::StatusCode::NOT_FOUND => {
564                // The crate was not found or deleted from the registry.
565                return Ok(LoadResponse::NotFound);
566            }
567            http::StatusCode::UNAUTHORIZED => {
568                // Store the headers for later error reporting if needed.
569                self.auth_error_headers.replace(
570                    response
571                        .headers
572                        .iter()
573                        .map(|(name, value)| {
574                            format!("{}: {}", name.as_str(), value.to_str().unwrap_or_default())
575                        })
576                        .collect(),
577                );
578
579                // Look for a `www-authenticate` header with the `Cargo` scheme.
580                for value in &response.headers.get_all(http::header::WWW_AUTHENTICATE) {
581                    for challenge in
582                        http_auth::ChallengeParser::new(value.to_str().unwrap_or_default())
583                    {
584                        match challenge {
585                            Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
586                                // Look for the `login_url` parameter.
587                                for (param, value) in challenge.params {
588                                    if param.eq_ignore_ascii_case("login_url") {
589                                        self.login_url
590                                            .replace(Some(value.to_unescaped().into_url()?));
591                                    }
592                                }
593                            }
594                            Ok(challenge) => {
595                                debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
596                            }
597                            Err(e) => {
598                                debug!(target: "network", "failed to parse challenge: {}", e)
599                            }
600                        }
601                    }
602                }
603
604                let mut err = Err(HttpNotSuccessful {
605                    code: http::StatusCode::UNAUTHORIZED.as_u16() as u32,
606                    body: body,
607                    url: full_url,
608                    ip: None,
609                    headers: response
610                        .headers
611                        .iter()
612                        .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap_or_default()))
613                        .collect(),
614                }
615                .into());
616                if self.auth_required.get() {
617                    let auth_error = auth::AuthorizationError::new(
618                        self.gctx,
619                        self.source_id,
620                        self.login_url.borrow().clone(),
621                        auth::AuthorizationErrorReason::TokenRejected,
622                    )?;
623                    err = err.context(auth_error)
624                }
625                err
626            }
627            code => Err(HttpNotSuccessful {
628                code: code.as_u16() as u32,
629                body: body,
630                url: full_url,
631                ip: response.client_ip().map(str::to_owned),
632                headers: response
633                    .headers
634                    .iter()
635                    .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap_or_default()))
636                    .collect(),
637            }
638            .into()),
639        }
640    }
641
642    /// Updates the state of the progress bar for downloads.
643    fn tick(&self) -> CargoResult<()> {
644        let mut progress = self.progress.borrow_mut();
645        let Some(progress) = progress.as_mut() else {
646            return Ok(());
647        };
648
649        if progress.update_allowed() {
650            let complete = self.fresh.borrow().len();
651            let pending = self.pending.get();
652            progress.print_now(&format!("{complete} complete; {pending} pending"))?;
653        }
654        Ok(())
655    }
656}