1use crate::core::PackageId;
4use crate::core::SourceId;
5use crate::core::global_cache_tracker;
6use crate::sources::registry::LoadResponse;
7use crate::sources::registry::MaybeLock;
8use crate::sources::registry::RegistryConfig;
9use crate::sources::registry::RegistryData;
10use crate::sources::registry::download;
11use crate::util::Filesystem;
12use crate::util::GlobalContext;
13use crate::util::IntoUrl;
14use crate::util::Progress;
15use crate::util::ProgressStyle;
16use crate::util::auth;
17use crate::util::cache_lock::CacheLockMode;
18use crate::util::errors::CargoResult;
19use crate::util::errors::HttpNotSuccessful;
20use crate::util::interning::InternedString;
21use crate::util::network::http_async::ResponsePartsExtensions;
22use crate::util::network::retry::Retry;
23use crate::util::network::retry::RetryResult;
24use anyhow::Context as _;
25use cargo_credential::Operation;
26use cargo_util::paths;
27use futures::lock::Mutex;
28use http::HeaderName;
29use http::HeaderValue;
30use std::cell::Cell;
31use std::cell::RefCell;
32use std::collections::HashSet;
33use std::fs;
34use std::fs::File;
35use std::io::ErrorKind;
36use std::path::Path;
37use std::str;
38use std::time::Duration;
39use tracing::debug;
40use tracing::trace;
41use tracing::warn;
42use url::Url;
43
44const ETAG: &'static str = "etag";
46const LAST_MODIFIED: &'static str = "last-modified";
47
48const UNKNOWN: &'static str = "Unknown";
49
50pub struct HttpRegistry<'gctx> {
67 name: InternedString,
70
71 registry_config: Mutex<Option<RegistryConfig>>,
73
74 inner: HttpBackend<'gctx>,
76}
77
78impl<'gctx> HttpRegistry<'gctx> {
79 pub fn new(
84 source_id: SourceId,
85 gctx: &'gctx GlobalContext,
86 name: &str,
87 ) -> CargoResult<HttpRegistry<'gctx>> {
88 Ok(HttpRegistry {
89 name: name.into(),
90 registry_config: Mutex::new(None),
91 inner: HttpBackend::new(source_id, gctx, name)?,
92 })
93 }
94
95 fn inner(&self) -> &HttpBackend<'gctx> {
96 &self.inner
97 }
98
99 async fn config(&self) -> CargoResult<RegistryConfig> {
101 let Some(config) = self.config_opt().await? else {
102 return Err(anyhow::anyhow!("config.json not found"));
103 };
104 Ok(config)
105 }
106
107 async fn config_opt(&self) -> CargoResult<Option<RegistryConfig>> {
110 let mut config = self.registry_config.lock().await;
111 if let Some(config) = &*config
112 && self.inner().is_fresh(RegistryConfig::NAME)
113 {
114 Ok(Some(config.clone()))
115 } else {
116 let result = self.config_opt_inner().await?;
117 *config = result.clone();
118 Ok(result)
119 }
120 }
121
122 async fn config_opt_inner(&self) -> CargoResult<Option<RegistryConfig>> {
123 debug!("loading config");
124 let index_path = self.assert_index_locked(&self.inner().index_cache_path);
125 let config_json_path = index_path.join(RegistryConfig::NAME);
126 if self.inner().is_fresh(RegistryConfig::NAME)
127 && let Some(config) = self.config_from_filesystem()
128 {
129 return Ok(Some(config.clone()));
130 }
131
132 if let Some(c) = self.config_from_filesystem() {
135 self.inner().auth_required.update(|v| v || c.auth_required);
136 }
137
138 let response = self
139 .inner()
140 .fetch_uncached(RegistryConfig::NAME, None)
141 .await;
142 let response = match response {
143 Err(e)
144 if !self.inner().auth_required.get()
145 && e.downcast_ref::<HttpNotSuccessful>()
146 .map(|e| e.code == 401)
147 .unwrap_or_default() =>
148 {
149 self.inner().auth_required.set(true);
150 debug!(target: "network", "re-attempting request for config.json with authorization included.");
151 self.inner()
152 .fetch_uncached(RegistryConfig::NAME, None)
153 .await
154 }
155 resp => resp,
156 }?;
157
158 match response {
159 LoadResponse::Data {
160 raw_data,
161 index_version: _,
162 } => {
163 trace!("config loaded");
164 let config = Some(serde_json::from_slice(&raw_data)?);
165 if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
166 if let Err(e) = fs::write(&config_json_path, &raw_data) {
167 tracing::debug!("failed to write config.json cache: {}", e);
168 }
169 }
170 Ok(config)
171 }
172 LoadResponse::NotFound => Ok(None),
173 LoadResponse::CacheValid => Err(crate::util::internal(
174 "config.json is never stored in the index cache",
175 )),
176 }
177 }
178
179 fn config_from_filesystem(&self) -> Option<RegistryConfig> {
181 let config_json_path = self
182 .assert_index_locked(&self.inner().index_cache_path)
183 .join(RegistryConfig::NAME);
184 match fs::read(&config_json_path) {
185 Ok(raw_data) => match serde_json::from_slice(&raw_data) {
186 Ok(json) => return Some(json),
187 Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
188 },
189 Err(e) => {
190 if e.kind() != ErrorKind::NotFound {
191 tracing::debug!("failed to read config.json cache: {}", e)
192 }
193 }
194 }
195 None
196 }
197
198 async fn sparse_fetch(
199 &self,
200 path: &str,
201 index_version: Option<&str>,
202 ) -> CargoResult<LoadResponse> {
203 if let Some(index_version) = index_version {
204 trace!("local cache of {path} is available at version `{index_version}`",);
205 if self.inner().is_fresh(&path) {
206 return Ok(LoadResponse::CacheValid);
207 }
208 } else if self.inner().fresh.borrow().contains(path) {
209 debug!("cache did not contain previously downloaded file {path}",);
211 return Ok(LoadResponse::NotFound);
212 }
213
214 let index_version =
216 index_version
217 .and_then(|v| v.split_once(':'))
218 .and_then(|(key, value)| match key {
219 ETAG => Some((
220 http::header::IF_NONE_MATCH,
221 HeaderValue::from_str(value.trim()).ok()?,
222 )),
223 LAST_MODIFIED => Some((
224 http::header::IF_MODIFIED_SINCE,
225 HeaderValue::from_str(value.trim()).ok()?,
226 )),
227 _ => {
228 debug!("unexpected index version: {}", index_version.unwrap());
229 None
230 }
231 });
232 let index_version = index_version.as_ref().map(|(k, v)| (k, v));
233 self.inner().fetch_uncached(&path, index_version).await
234 }
235}
236
237#[async_trait::async_trait(?Send)]
238impl<'gctx> RegistryData for HttpRegistry<'gctx> {
239 fn prepare(&self) -> CargoResult<()> {
240 self.inner()
241 .gctx
242 .deferred_global_last_use()?
243 .mark_registry_index_used(global_cache_tracker::RegistryIndex {
244 encoded_registry_name: self.name,
245 });
246 Ok(())
247 }
248
249 fn index_path(&self) -> &Filesystem {
250 &self.inner().index_cache_path
251 }
252
253 fn cache_path(&self) -> &Filesystem {
254 &self.inner().crate_cache_path
255 }
256
257 fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
258 self.inner()
259 .gctx
260 .assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
261 }
262
263 fn is_updated(&self) -> bool {
264 self.inner().requested_update.get()
265 }
266
267 async fn load(
268 &self,
269 _root: &Path,
270 path: &Path,
271 index_version: Option<&str>,
272 ) -> CargoResult<LoadResponse> {
273 let Some(config) = self.config_opt().await? else {
275 return Ok(LoadResponse::NotFound);
276 };
277 self.inner()
278 .auth_required
279 .update(|v| v || config.auth_required);
280
281 let path = path
282 .to_str()
283 .ok_or_else(|| anyhow::anyhow!("non UTF8 path: {}", path.display()))?;
284 self.sparse_fetch(path, index_version).await
285 }
286
287 async fn config(&self) -> CargoResult<Option<RegistryConfig>> {
288 Ok(Some(self.config().await?))
289 }
290
291 fn invalidate_cache(&self) {
292 debug!("invalidated index cache");
296 self.inner().fresh.borrow_mut().clear();
297 self.inner().requested_update.set(true);
298 }
299
300 fn set_quiet(&mut self, quiet: bool) {
301 self.inner().quiet.set(quiet);
302 self.inner().progress.replace(None);
303 }
304
305 fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
306 let registry_config = crate::util::block_on(self.config())?;
307 download::download(
308 &self.inner().crate_cache_path,
309 &self.inner().gctx,
310 self.name.clone(),
311 pkg,
312 checksum,
313 registry_config,
314 )
315 }
316
317 fn finish_download(&self, pkg: PackageId, checksum: &str, data: &[u8]) -> CargoResult<File> {
318 download::finish_download(
319 &self.inner().crate_cache_path,
320 &self.inner().gctx,
321 self.name.clone(),
322 pkg,
323 checksum,
324 data,
325 )
326 }
327
328 fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
329 download::is_crate_downloaded(&self.inner().crate_cache_path, &self.inner().gctx, pkg)
330 }
331}
332
333struct HttpBackend<'gctx> {
334 index_cache_path: Filesystem,
336
337 crate_cache_path: Filesystem,
339
340 source_id: SourceId,
342 gctx: &'gctx GlobalContext,
343
344 url: Url,
346
347 requested_update: Cell<bool>,
351
352 progress: RefCell<Option<Progress<'gctx>>>,
354
355 pending: Cell<usize>,
357
358 fresh: RefCell<HashSet<String>>,
362
363 fetch_started: Cell<bool>,
365
366 auth_required: Cell<bool>,
368
369 login_url: RefCell<Option<Url>>,
371
372 auth_error_headers: RefCell<Vec<String>>,
374
375 quiet: Cell<bool>,
377}
378
379impl<'gctx> HttpBackend<'gctx> {
380 pub fn new(
381 source_id: SourceId,
382 gctx: &'gctx GlobalContext,
383 name: &str,
384 ) -> CargoResult<HttpBackend<'gctx>> {
385 let url = source_id.url().as_str();
386 if !url.ends_with('/') {
388 anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
389 }
390 assert!(source_id.is_sparse());
391 let url = url
392 .strip_prefix("sparse+")
393 .expect("sparse registry needs sparse+ prefix")
394 .into_url()
395 .expect("a url with the sparse+ stripped should still be valid");
396
397 let index_cache_path = gctx.registry_index_path().join(name);
398 Ok(HttpBackend {
399 index_cache_path: index_cache_path.clone(),
400 crate_cache_path: gctx.registry_cache_path().join(name),
401 source_id,
402 gctx,
403 url,
404 progress: RefCell::new(Some(Progress::with_style(
405 "Fetch",
406 ProgressStyle::Indeterminate,
407 gctx,
408 ))),
409 fresh: RefCell::new(HashSet::new()),
410 requested_update: Cell::new(false),
411 fetch_started: Cell::new(false),
412 auth_required: Cell::new(false),
413 login_url: RefCell::new(None),
414 auth_error_headers: RefCell::new(vec![]),
415 quiet: Cell::new(false),
416 pending: Cell::new(0),
417 })
418 }
419
420 fn full_url(&self, path: &str) -> String {
422 format!("{}{}", self.url, path)
424 }
425
426 fn start_fetch(&self) -> CargoResult<()> {
430 if self.fetch_started.get() {
431 return Ok(());
433 }
434 self.fetch_started.set(true);
435
436 if !self.quiet.get() {
437 self.gctx
438 .shell()
439 .status("Updating", self.source_id.display_index())?;
440 }
441
442 Ok(())
443 }
444
445 fn offline(&self) -> bool {
451 !self.gctx.network_allowed() || self.gctx.cli_unstable().no_index_update
452 }
453
454 fn is_fresh(&self, path: &str) -> bool {
456 if !self.requested_update.get() {
457 trace!("using local {path} as user did not request update",);
458 true
459 } else if self.offline() {
460 trace!("using local {path} in offline mode");
461 true
462 } else if self.fresh.borrow().contains(path) {
463 trace!("using local {path} as it was already fetched");
464 true
465 } else {
466 debug!("checking freshness of {path}");
467 false
468 }
469 }
470
471 async fn fetch_uncached(
472 &self,
473 path: &str,
474 extra_header: Option<(&HeaderName, &HeaderValue)>,
475 ) -> CargoResult<LoadResponse> {
476 if self.offline() {
477 return Ok(LoadResponse::NotFound);
478 }
479
480 if !self.fresh.borrow_mut().insert(path.to_string()) {
481 warn!("downloaded the index file `{path}` twice");
482 }
483
484 let mut r = Retry::new(self.gctx)?;
485 self.pending.update(|v| v + 1);
486 let response = loop {
487 let response = self.fetch_uncached_no_retry(path, extra_header).await;
488 match r.r#try(|| response) {
489 RetryResult::Success(result) => break Ok(result),
490 RetryResult::Err(error) => break Err(error),
491 RetryResult::Retry(delay_ms) => {
492 futures_timer::Delay::new(Duration::from_millis(delay_ms)).await;
493 }
494 }
495 };
496 self.pending.update(|v| v - 1);
497 response
498 }
499
500 async fn fetch_uncached_no_retry(
501 &self,
502 path: &str,
503 extra_header: Option<(&HeaderName, &HeaderValue)>,
504 ) -> CargoResult<LoadResponse> {
505 trace!("load: {path}");
506 self.start_fetch()?;
507 let full_url = self.full_url(path);
508 let mut request = http::Request::get(&full_url);
509
510 request = request.header("cargo-protocol", "version=1");
513 request = request.header(http::header::ACCEPT, "text/plain");
514
515 if let Some((k, v)) = extra_header {
516 request = request.header(k, v);
517 }
518
519 if self.auth_required.get() {
520 let authorization = auth::auth_token(
521 self.gctx,
522 &self.source_id,
523 self.login_url.borrow().clone().as_ref(),
524 Operation::Read,
525 self.auth_error_headers.borrow().clone(),
526 true,
527 )?;
528 request = request.header(http::header::AUTHORIZATION, authorization);
529 trace!(target: "network", "including authorization for {}", full_url);
530 }
531
532 let response = self
533 .gctx
534 .http_async()?
535 .request(request.body(Vec::new())?)
536 .await
537 .with_context(|| format!("download of {path} failed"))?;
538
539 self.tick()?;
540
541 let (response, body) = response.into_parts();
542
543 match response.status {
544 http::StatusCode::OK => {
545 let response_index_version =
546 if let Some(etag) = response.headers.get(http::header::ETAG) {
547 format!("{}: {}", ETAG, etag.to_str().unwrap())
548 } else if let Some(lm) = response.headers.get(http::header::LAST_MODIFIED) {
549 format!("{}: {}", LAST_MODIFIED, lm.to_str().unwrap())
550 } else {
551 UNKNOWN.to_string()
552 };
553 trace!("index file version: {}", response_index_version);
554 Ok(LoadResponse::Data {
555 raw_data: body,
556 index_version: Some(response_index_version),
557 })
558 }
559 http::StatusCode::NOT_MODIFIED => {
560 Ok(LoadResponse::CacheValid)
562 }
563 http::StatusCode::NOT_FOUND => {
564 return Ok(LoadResponse::NotFound);
566 }
567 http::StatusCode::UNAUTHORIZED => {
568 self.auth_error_headers.replace(
570 response
571 .headers
572 .iter()
573 .map(|(name, value)| {
574 format!("{}: {}", name.as_str(), value.to_str().unwrap_or_default())
575 })
576 .collect(),
577 );
578
579 for value in &response.headers.get_all(http::header::WWW_AUTHENTICATE) {
581 for challenge in
582 http_auth::ChallengeParser::new(value.to_str().unwrap_or_default())
583 {
584 match challenge {
585 Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
586 for (param, value) in challenge.params {
588 if param.eq_ignore_ascii_case("login_url") {
589 self.login_url
590 .replace(Some(value.to_unescaped().into_url()?));
591 }
592 }
593 }
594 Ok(challenge) => {
595 debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
596 }
597 Err(e) => {
598 debug!(target: "network", "failed to parse challenge: {}", e)
599 }
600 }
601 }
602 }
603
604 let mut err = Err(HttpNotSuccessful {
605 code: http::StatusCode::UNAUTHORIZED.as_u16() as u32,
606 body: body,
607 url: full_url,
608 ip: None,
609 headers: response
610 .headers
611 .iter()
612 .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap_or_default()))
613 .collect(),
614 }
615 .into());
616 if self.auth_required.get() {
617 let auth_error = auth::AuthorizationError::new(
618 self.gctx,
619 self.source_id,
620 self.login_url.borrow().clone(),
621 auth::AuthorizationErrorReason::TokenRejected,
622 )?;
623 err = err.context(auth_error)
624 }
625 err
626 }
627 code => Err(HttpNotSuccessful {
628 code: code.as_u16() as u32,
629 body: body,
630 url: full_url,
631 ip: response.client_ip().map(str::to_owned),
632 headers: response
633 .headers
634 .iter()
635 .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap_or_default()))
636 .collect(),
637 }
638 .into()),
639 }
640 }
641
642 fn tick(&self) -> CargoResult<()> {
644 let mut progress = self.progress.borrow_mut();
645 let Some(progress) = progress.as_mut() else {
646 return Ok(());
647 };
648
649 if progress.update_allowed() {
650 let complete = self.fresh.borrow().len();
651 let pending = self.pending.get();
652 progress.print_now(&format!("{complete} complete; {pending} pending"))?;
653 }
654 Ok(())
655 }
656}