From 533acb3407b0de4551e74c83fca295e97cc0afe1 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 14 Mar 2026 14:55:11 +0100 Subject: [PATCH] Actively manage local archive-index cache --- Cargo.lock | 51 +- crates/bin/docs_rs_watcher/src/db/delete.rs | 8 +- crates/lib/docs_rs_storage/Cargo.toml | 16 +- .../benches/archive_index_cache.rs | 138 +++ .../lib/docs_rs_storage/src/archive_index.rs | 894 ++++++++++++++++-- crates/lib/docs_rs_storage/src/config.rs | 91 +- crates/lib/docs_rs_storage/src/metrics.rs | 1 + .../src/storage/non_blocking.rs | 18 +- .../docs_rs_storage/src/testing/test_env.rs | 5 +- 9 files changed, 1091 insertions(+), 131 deletions(-) create mode 100644 crates/lib/docs_rs_storage/benches/archive_index_cache.rs diff --git a/Cargo.lock b/Cargo.lock index fd8e944d0..19674ca85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,6 +380,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1597,6 +1608,7 @@ dependencies = [ "serde", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -2339,7 +2351,6 @@ dependencies = [ "docs_rs_config", "docs_rs_env_vars", "docs_rs_headers", - "docs_rs_logging", "docs_rs_mimes", "docs_rs_opentelemetry", "docs_rs_rustdoc_json", @@ -2350,6 +2361,7 @@ dependencies = [ "http 1.4.0", "itertools 0.14.0", "mime", + "moka", "opentelemetry", "rand 0.10.0", "serde_json", @@ -2682,6 +2694,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "faster-hex" version = "0.10.0" @@ -5631,6 +5653,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "moka" +version = "0.12.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85f8024e1c8e71c778968af91d43700ce1d11b219d127d79fb2934153b82b42b" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "smallvec", + "tagptr", + "uuid", +] + [[package]] name = "native-tls" version = "0.2.18" @@ -8228,6 +8270,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.44" @@ -8922,6 +8970,7 @@ version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ + "getrandom 0.4.2", "js-sys", "serde_core", "wasm-bindgen", diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index 141369be1..3b140f507 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -36,7 +36,8 @@ pub async fn delete_crate( // remove existing local archive index files. let local_index_folder = storage .config() - .local_archive_cache_path + .archive_index_cache + .path .join(&remote_folder); if local_index_folder.exists() { fs::remove_dir_all(&local_index_folder) @@ -76,7 +77,7 @@ pub async fn delete_version( .await?; } - let local_archive_cache = &storage.config().local_archive_cache_path; + let local_archive_cache = &storage.config().archive_index_cache.path; let mut paths = vec![source_archive_path(name, version)]; if is_library { paths.push(rustdoc_archive_path(name, version)); @@ -498,7 +499,8 @@ mod tests { assert!( !storage .config() - .local_archive_cache_path + .archive_index_cache + .path .join(&archive_index) .exists() ); diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index efd472711..d63398b12 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -7,9 +7,8 @@ edition.workspace = true [features] testing = [ "dep:rand", - "dep:docs_rs_logging", + "dep:dashmap", "docs_rs_config/testing", - "docs_rs_logging/testing", "docs_rs_opentelemetry/testing", ] @@ -22,11 +21,10 @@ aws-sdk-s3 = "1.3.0" aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } bzip2 = "0.6.0" chrono = { workspace = true } -dashmap = "6.0.0" +dashmap = { version = "6.0.0", optional = true } docs_rs_config = { path = "../docs_rs_config" } docs_rs_env_vars = { path = "../docs_rs_env_vars" } docs_rs_headers = { path = "../docs_rs_headers" } -docs_rs_logging = { path = "../docs_rs_logging", optional = true } docs_rs_mimes = { path = "../docs_rs_mimes" } docs_rs_opentelemetry = { path = "../docs_rs_opentelemetry" } docs_rs_rustdoc_json = { path = "../docs_rs_rustdoc_json" } @@ -37,6 +35,7 @@ futures-util = { workspace = true } http = { workspace = true } itertools = { workspace = true } mime = { workspace = true } +moka = { version = "0.12.14", features = ["future"] } opentelemetry = { workspace = true } rand = { workspace = true, optional = true } serde_json = { workspace = true } @@ -52,9 +51,9 @@ zip = { workspace = true } zstd = "0.13.0" [dev-dependencies] -criterion = "0.8.0" +criterion = { version = "0.8.0", features = ["async_tokio"] } +dashmap = "6.0.0" docs_rs_config = { path = "../docs_rs_config", features = ["testing"] } -docs_rs_logging = { path = "../docs_rs_logging", features = ["testing"] } docs_rs_opentelemetry = { path = "../docs_rs_opentelemetry", features = ["testing"] } rand = { workspace = true } test-case = { workspace = true } @@ -63,5 +62,10 @@ test-case = { workspace = true } name = "compression" harness = false +[[bench]] +name = "archive_index_cache" +harness = false +required-features = ["testing"] + [lints] workspace = true diff --git a/crates/lib/docs_rs_storage/benches/archive_index_cache.rs b/crates/lib/docs_rs_storage/benches/archive_index_cache.rs new file mode 100644 index 000000000..f390e5fdb --- /dev/null +++ b/crates/lib/docs_rs_storage/benches/archive_index_cache.rs @@ -0,0 +1,138 @@ +use anyhow::Result; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use docs_rs_opentelemetry::testing::TestMetrics; +use docs_rs_storage::{StorageKind, testing::TestStorage}; +use docs_rs_types::BuildId; +use futures_util::future::try_join_all; +use std::{ + path::Path, + sync::atomic::{AtomicI32, Ordering}, +}; +use tokio::{fs, runtime}; + +const ARCHIVE_PATH: &str = "bench/archive.zip"; +const FILE_IN_ARCHIVE: &str = "Cargo.toml"; + +async fn write_fixture_files(root: &Path) -> Result<()> { + fs::create_dir_all(root.join("src")).await?; + fs::write(root.join("Cargo.toml"), "[package]\nname = \"bench\"\n").await?; + fs::write(root.join("src/lib.rs"), "pub fn f() -> usize { 42 }\n").await?; + Ok(()) +} + +async fn create_storage_and_archive() -> Result { + let metrics = TestMetrics::new(); + let storage = TestStorage::from_kind(StorageKind::Memory, metrics.provider()).await?; + + let fixture_dir = tempfile::tempdir()?; + write_fixture_files(fixture_dir.path()).await?; + + storage + .store_all_in_archive(ARCHIVE_PATH, fixture_dir.path()) + .await?; + + Ok(storage) +} + +pub fn archive_index_cache(c: &mut Criterion) { + let runtime = runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let storage = runtime + .block_on(create_storage_and_archive()) + .expect("can't create test storage & archive"); + + runtime.block_on(async { + assert!( + storage + .exists_in_archive(ARCHIVE_PATH, Some(BuildId(1)), FILE_IN_ARCHIVE) + .await + .expect("initial exists-call failed") + ); + }); + + let mut group = c.benchmark_group("archive_index_cache"); + + group.bench_function( + BenchmarkId::new("hot_local_index_single", "exists_in_archive"), + |b| { + b.to_async(&runtime).iter(|| async { + assert!( + storage + .exists_in_archive(ARCHIVE_PATH, Some(BuildId(1)), FILE_IN_ARCHIVE) + .await + .unwrap() + ); + }); + }, + ); + + let cold_counter = AtomicI32::new(10_000); + group.bench_function( + BenchmarkId::new("cold_index_single", "exists_in_archive"), + |b| { + b.to_async(&runtime).iter(|| async { + let build_id = BuildId(cold_counter.fetch_add(1, Ordering::Relaxed)); + assert!( + storage + .exists_in_archive(ARCHIVE_PATH, Some(build_id), FILE_IN_ARCHIVE) + .await + .unwrap() + ); + }); + }, + ); + + let concurrent_counter = AtomicI32::new(20_000); + group.bench_function( + BenchmarkId::new("cold_index_concurrent_same_key_16", "exists_in_archive"), + |b| { + b.to_async(&runtime).iter(|| async { + let build_id = BuildId(concurrent_counter.fetch_add(1, Ordering::Relaxed)); + let futures = (0..16).map(|_| { + storage.exists_in_archive(ARCHIVE_PATH, Some(build_id), FILE_IN_ARCHIVE) + }); + + let results = try_join_all(futures).await.unwrap(); + assert!(results.into_iter().all(std::convert::identity)); + }); + }, + ); + + let recover_counter = AtomicI32::new(30_000); + group.bench_function( + BenchmarkId::new("purge_then_recover_single", "exists_in_archive"), + |b| { + b.to_async(&runtime).iter(|| async { + let build_id = BuildId(recover_counter.fetch_add(1, Ordering::Relaxed)); + assert!( + storage + .exists_in_archive(ARCHIVE_PATH, Some(build_id), FILE_IN_ARCHIVE) + .await + .unwrap() + ); + + let local_index_path = storage + .config() + .archive_index_cache + .path + .join(format!("{ARCHIVE_PATH}.{}.index", build_id.0)); + let _ = fs::remove_file(&local_index_path).await; + + assert!( + storage + .exists_in_archive(ARCHIVE_PATH, Some(build_id), FILE_IN_ARCHIVE) + .await + .unwrap() + ); + }); + }, + ); + + group.finish(); +} + +criterion_group!(archive_index_cache_benches, archive_index_cache); +criterion_main!(archive_index_cache_benches); diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index a55042029..e71c720de 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -1,35 +1,194 @@ -use crate::{Config, blob::StreamingBlob, types::FileRange}; +use crate::{ + PathNotFoundError, blob::StreamingBlob, config::ArchiveIndexCacheConfig, types::FileRange, + utils::file_list::walk_dir_recursive, +}; use anyhow::{Context as _, Result, anyhow, bail}; -use dashmap::DashMap; +use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm}; use docs_rs_utils::spawn_blocking; +use futures_util::TryStreamExt as _; +use moka::future::Cache as MokaCache; +use opentelemetry::{ + KeyValue, + metrics::{Counter, Gauge, Histogram}, +}; use sqlx::{ConnectOptions as _, Connection as _, QueryBuilder, Row as _, Sqlite}; use std::{ future::Future, path::{Path, PathBuf}, pin::Pin, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, }; use tokio::{ fs, io::{self, AsyncRead, AsyncSeek, AsyncWriteExt as _}, - sync::{Mutex, mpsc}, + sync::mpsc, + task::JoinHandle, }; use tokio_util::io::SyncIoBridge; -use tracing::{debug, instrument, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; pub(crate) const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; +/// dummy size we assume in case of errors +const DUMMY_FILE_SIZE: u64 = 1024 * 1024; // 1 MiB +/// self-repair attempts +const FIND_ATTEMPTS: usize = 3; + +#[derive(Debug)] +struct Metrics { + // calls to find an entry in the local cache + find_calls: Counter, + + // local cache eviction + evicted_entries: Counter, + evicted_bytes_total: Counter, + evicted_entry_size: Histogram, + + // local cache misses / downloads & bytes + // includes & doesn't differentiate retries / repairs for now + downloads: Counter, + downloaded_bytes: Counter, + downloaded_entry_size: Histogram, + + // full cache size (count / bytes) + weighted_size_bytes: Gauge, + entry_count: Gauge, +} + +impl Metrics { + fn new(meter_provider: &AnyMeterProvider) -> Self { + let meter = meter_provider.meter("storage"); + const PREFIX: &str = "docsrs.storage.archive_index_cache"; + const KIB: f64 = 1024.0; + const MIB: f64 = 1024.0 * KIB; + const GIB: f64 = 1024.0 * MIB; + + let entry_size_boundaries = vec![ + 500.0 * KIB, + 1.0 * MIB, + 2.0 * MIB, + 4.0 * MIB, + 8.0 * MIB, + 16.0 * MIB, + 32.0 * MIB, + 64.0 * MIB, + 128.0 * MIB, + 256.0 * MIB, + 512.0 * MIB, + 1.0 * GIB, + 2.0 * GIB, + 4.0 * GIB, + 8.0 * GIB, + 10.0 * GIB, + ]; + + Self { + find_calls: meter + .u64_counter(format!("{PREFIX}.find_total")) + .with_unit("1") + .build(), + downloads: meter + .u64_counter(format!("{PREFIX}.download_total")) + .with_unit("1") + .build(), + downloaded_bytes: meter + .u64_counter(format!("{PREFIX}.download_bytes_total")) + .with_unit("By") + .build(), + evicted_entries: meter + .u64_counter(format!("{PREFIX}.eviction_total")) + .with_unit("1") + .build(), + evicted_bytes_total: meter + .u64_counter(format!("{PREFIX}.evicted_bytes_total")) + .with_unit("By") + .build(), + evicted_entry_size: meter + .u64_histogram(format!("{PREFIX}.evicted_entry_size")) + .with_unit("By") + .with_boundaries(entry_size_boundaries.clone()) + .build(), + downloaded_entry_size: meter + .u64_histogram(format!("{PREFIX}.downloaded_entry_size")) + .with_unit("By") + .with_boundaries(entry_size_boundaries) + .build(), + weighted_size_bytes: meter + .u64_gauge(format!("{PREFIX}.weighted_size_bytes")) + .with_unit("By") + .build(), + entry_count: meter + .u64_gauge(format!("{PREFIX}.entry_count")) + .with_unit("1") + .build(), + } + } +} + #[derive(PartialEq, Eq, Debug)] pub(crate) struct FileInfo { range: FileRange, compression: CompressionAlgorithm, } +struct Entry { + // file size of the local sqlite database. + // Will be used to "weigh" cache entries, so that the cache can evict based on + // total size of cached files instead of number of entries. + file_size_kib: u32, +} + +impl Entry { + fn from_size(file_size: u64) -> Self { + let file_size_kib = file_size.div_ceil(1024).max(1).min(u32::MAX as u64) as u32; + Self { file_size_kib } + } + + async fn from_path(path: impl AsRef) -> Self { + let path = path.as_ref(); + Self::from_size(match fs::metadata(&path).await { + Ok(meta) => meta.len(), + Err(err) => { + warn!( + ?err, + ?path, + "failed to get metadata for local archive index file, using dummy size for cache eviction" + ); + DUMMY_FILE_SIZE + } + }) + } +} + +type CacheManager = MokaCache>; + +/// Local archive index cache. +/// +/// Note: "last access" times for cache entries reset on each server startup +/// (the moka cache starts empty and gets backfilled from disk without +/// preserving prior access timestamps). This means TTI-based eviction is +/// uninformed until real traffic re-establishes usage patterns. +/// +/// This is acceptable because: +/// - Builds happen infrequently (every couple of months), so cached index +/// data stays valid for a long time. Serving it for an extra TTL window +/// after a restart is harmless. +/// - moka's TinyLFU-based eviction policy adapts quickly once traffic +/// resumes. +/// - Persisting access timestamps would add significant complexity +/// (moka doesn't support injecting custom timestamps on insert) for +/// marginal benefit. pub(crate) struct Cache { - local_archive_cache_path: PathBuf, - /// Locks to synchronize write-access to the locally cached archive index files. - locks: DashMap>>, + config: Arc, + /// Tracks locally cached archive indices and coordinates their initialization & invalidation. + manager: CacheManager, + metrics: Arc, + background_tasks: Vec>, } pub(crate) trait Downloader { @@ -40,30 +199,227 @@ pub(crate) trait Downloader { } impl Cache { - pub(crate) fn new(config: &Config) -> Self { - Self { - local_archive_cache_path: config.local_archive_cache_path.clone(), - locks: DashMap::with_capacity(config.local_archive_cache_expected_count), + /// create a new archive index cache. + /// + /// Also starts a background task that will backfill the in-memory cache management based + /// on the local files that are already. + pub(crate) async fn new( + config: Arc, + meter_provider: &AnyMeterProvider, + ) -> Result { + let mut cache = Self::new_inner(config.clone(), meter_provider).await?; + + cache.background_tasks.push(tokio::spawn({ + let manager = cache.manager.clone(); + async move { + if let Err(err) = Self::backfill_cache_manager(config, manager).await { + error!(?err, "failed to backfill archive index cache manager"); + } + } + })); + + Ok(cache) + } + + /// create a new archive index cache, and directly backfill the in-memory structures. + /// + /// Only for testing. + #[cfg(test)] + async fn new_with_backfill( + config: Arc, + meter_provider: &AnyMeterProvider, + ) -> Result { + let cache = Self::new_inner(config.clone(), meter_provider).await?; + + Self::backfill_cache_manager(config, cache.manager.clone()) + .await + .context("failed to backfill archive index cache manager")?; + + Ok(cache) + } + + async fn new_inner( + config: Arc, + meter_provider: &AnyMeterProvider, + ) -> Result { + fs::create_dir_all(&config.path) + .await + .context("failed to create archive index cache directory")?; + + let metrics = Arc::new(Metrics::new(meter_provider)); + let metrics_for_eviction = metrics.clone(); + let manager = CacheManager::builder() + .initial_capacity(config.expected_count) + // Time to idle (TTI): A cached entry will be expired after + // the specified duration past from get or insert. + // We don't set TTL (time to live), which would be just time-after-insert. + .time_to_idle(config.ttl) + // we weigh each cache entry by the file size of the sqlite database. + // The max size of the cache for all of docs.rs is 500 GiB at the time of writing. + // In KiB, this would be around 500k, which makes KiB the right unit. + // Anything bigger (like MiB) would mean that we count smaller dbs than 1 MiB as if + // they were 1 MiB big. + .weigher(|_key: &PathBuf, entry: &Arc| -> u32 { entry.file_size_kib }) + // max capacity + // not entries, but _weighted entries_. + // with the weight fn from above, the max capacity is a storage size value. + .max_capacity(config.max_size_mb * 1024) + // the eviction listener is called when moka evicts a cache entry. + // In this case we want to delete the corresponding local files. + .eviction_listener(move |path, entry, reason| { + let path = path.to_path_buf(); + let metrics = metrics_for_eviction.clone(); + // The spawned task means file deletion is deferred. See the + // "benign race with the eviction listener" comment in `find_inner` + // for why this is acceptable. + tokio::spawn(async move { + let reason = format!("{reason:?}"); + let evicted_bytes = entry.file_size_kib as u64 * 1024; + let reason_attr = [KeyValue::new("cause", reason.clone())]; + + metrics.evicted_entries.add(1, &reason_attr); + metrics.evicted_bytes_total.add(evicted_bytes, &reason_attr); + metrics + .evicted_entry_size + .record(evicted_bytes, &reason_attr); + + trace!( + ?path, + ?reason_attr, + "evicting local archive index file from cache" + ); + if let Err(err) = Self::remove_local_index(&path).await { + error!( + ?err, + ?path, + ?reason, + "failed to remove local archive index file on cache eviction" + ); + } + }); + }) + .build(); + + let handle = tokio::spawn({ + let manager = manager.clone(); + let metrics = metrics.clone(); + + // moka will also run maintenance tasks itself, but I want to force this + // at least every 30 seconds. + // + // We also use this background task to gather metrics. + async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + loop { + interval.tick().await; + + debug!("running pending tasks for archive index cache manager"); + manager.run_pending_tasks().await; + + debug!("collect cache size metrics"); + metrics.entry_count.record(manager.entry_count(), &[]); + metrics + .weighted_size_bytes + .record(manager.weighted_size() * 1024, &[]); + } + } + }); + + let cache = Self { + manager, + config, + metrics, + background_tasks: vec![handle], + }; + Ok(cache) + } + + /// run any pending tasks, like evictions that need to delete local files. + #[cfg(test)] + async fn flush(&self) -> Result<()> { + self.manager.run_pending_tasks().await; + Ok(()) + } + + #[cfg(test)] + async fn backfill(&self) -> Result<()> { + Self::backfill_cache_manager(self.config.clone(), self.manager.clone()).await + } + + /// backfill the in memory cache management based on the local files that are already + /// present on disk. + /// + /// Should be needed only once after server startup. + /// + /// While this is running, our `find_inner` & `download_archive_index` logic will just + /// fill it itself. + /// + /// Concurrency is set to a lower value intentionally so we don't put + /// too much i/o pressure onto the disk. + #[instrument(skip_all)] + async fn backfill_cache_manager( + config: Arc, + manager: CacheManager, + ) -> Result<()> { + info!(path=%config.path.display(), "starting cache-manager backfill from local directory"); + let inserted = Arc::new(AtomicU64::new(0)); + + walk_dir_recursive(&config.path) + .err_into::() + .try_for_each_concurrent(Some(4), |item| { + let manager = manager.clone(); + let inserted = inserted.clone(); + async move { + let path = item.absolute; + if path.extension().and_then(|ext| ext.to_str()) + == Some(ARCHIVE_INDEX_FILE_EXTENSION) + { + let entry = manager + .entry(path) + .or_insert_with(async { + Arc::new(Entry::from_size(item.metadata.len())) + }) + .await; + + if entry.is_fresh() { + inserted.fetch_add(1, Ordering::Relaxed); + } + } + Ok(()) + } + }) + .await?; + + info!( + inserted_count = inserted.load(Ordering::Relaxed), + "finished cache-manager backfill" + ); + Ok(()) + } + + async fn remove_local_index(path: impl AsRef) -> Result<()> { + let path = path.as_ref(); + for ext in &["wal", "shm"] { + let to_delete = path.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-{ext}")); + let _ = fs::remove_file(&to_delete).await; + } + + if let Err(err) = fs::remove_file(&path).await + && err.kind() != io::ErrorKind::NotFound + { + Err(err.into()) + } else { + Ok(()) } } fn local_index_path(&self, archive_path: &str, latest_build_id: Option) -> PathBuf { - self.local_archive_cache_path.join(format!( + self.config.path.join(format!( "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", latest_build_id.map(|id| id.0).unwrap_or(0) )) } - fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { - let local_index_path = local_index_path.as_ref().to_path_buf(); - - self.locks - .entry(local_index_path) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .downgrade() - .clone() - } - /// purge a single archive index file pub(crate) async fn purge( &self, @@ -71,18 +427,8 @@ impl Cache { latest_build_id: Option, ) -> Result<()> { let local_index_path = self.local_index_path(archive_path, latest_build_id); - let rwlock = self.local_index_cache_lock(&local_index_path); - let _write_guard = rwlock.lock().await; - - for ext in &["wal", "shm"] { - let to_delete = - local_index_path.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-{ext}")); - let _ = fs::remove_file(&to_delete).await; - } - - if fs::try_exists(&local_index_path).await? { - fs::remove_file(&local_index_path).await?; - } + Self::remove_local_index(&local_index_path).await?; + self.manager.invalidate(&local_index_path).await; Ok(()) } @@ -97,29 +443,83 @@ impl Cache { let local_index_path = self.local_index_path(archive_path, latest_build_id); // fast path: try to use whatever is there, no locking - match find_in_file(&local_index_path, path_in_archive).await { + let force_redownload = match find_in_file(&local_index_path, path_in_archive).await { Ok(res) => return Ok(res), Err(err) => { + let force_redownload = !err.is::(); debug!(?err, "archive index lookup failed, will try repair."); + force_redownload } - } - - let lock = self.local_index_cache_lock(&local_index_path); - let write_guard = lock.lock().await; - - // Double-check: maybe someone fixed it between our first failure and now. - if let Ok(res) = find_in_file(&local_index_path, path_in_archive).await { - return Ok(res); - } + }; let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); - // We are the repairer: download fresh index into place. - self.download_archive_index(downloader, &local_index_path, &remote_index_path) - .await?; - - // Write lock is dropped here (end of scope), so others can proceed. - drop(write_guard); + // moka will coalesce all concurrent calls to try_get_with_by_ref with the same key + // into a single call to the async closure. + // https://docs.rs/moka/0.12.14/moka/future/struct.Cache.html#concurrent-calls-on-the-same-key + // So we don't need any locking here to prevent multiple downloads for the same + // missing archive index. + self.manager + .try_get_with_by_ref(&local_index_path, async { + // NOTE: benign race with the eviction listener. + // + // When moka evicts an entry (time/size pressure), it removes it from the + // cache immediately but runs the eviction listener later (via a spawned + // tokio task that deletes the local file). + // + // If a new request arrives between the cache removal and the file deletion: + // 1. Cache miss → we enter this closure. + // 2. `try_exists` → true (file not deleted yet). + // 3. We re-insert the existing file into the cache. + // 4. The eviction listener's spawned task then runs and deletes the file + // out from under us. + // 5. The next `find` call fails on the fast path (file gone), falls back + // into this closure, sees `try_exists` → false, and re-downloads. + // + // Net impact: one request pays the cost of an extra S3 download. No error + // is visible to the user since the self-repair logic handles it. + let entry = if !force_redownload && fs::try_exists(&local_index_path).await? { + // after server startup we might have local indexes that don't + // yet exist in our cache manager. + // So we only need to download if the file doesn't exist. + Entry::from_path(&local_index_path).await + } else { + if force_redownload { + Self::remove_local_index(&local_index_path).await?; + } + Entry::from_size( + self.download_archive_index( + downloader, + &local_index_path, + &remote_index_path, + ) + .await?, + ) + }; + Ok::<_, anyhow::Error>(Arc::new(entry)) + }) + .await + .map_err(|arc_err: Arc| { + // We can't convert this Arc into the inner error type. + // See https://github.com/moka-rs/moka/issues/497 + // But since some callers are specifically checking + // ::is to differentiate other errors from + // the "not found" case, we want to preserve that information + // if it was the cause of the error. + // + // This mean all error types that we later want to use with ::is<> or + // ::downcast<> have to be mentioned here. + // + // While we could also migrate to a custom enum error type, this would + // only be really nice when the whole storage lib uses is. Otherwise + // we'll end up with some hardcoded conversions again. + // So I can leave it as-is for now. + if arc_err.is::() { + anyhow!(PathNotFoundError) + } else { + anyhow!(arc_err) + } + })?; // Final attempt: if this still fails, bubble the error. find_in_file(local_index_path, path_in_archive).await @@ -136,20 +536,39 @@ impl Cache { path_in_archive: &str, downloader: &impl Downloader, ) -> Result> { - for attempt in 0..2 { + for attempt in 1..=FIND_ATTEMPTS { match self .find_inner(archive_path, latest_build_id, path_in_archive, downloader) .await { - Ok(file_info) => return Ok(file_info), - Err(err) if attempt == 0 => { + Ok(file_info) => { + self.metrics.find_calls.add( + 1, + &[ + KeyValue::new("attempt", attempt.to_string()), + KeyValue::new("outcome", "success"), + ], + ); + return Ok(file_info); + } + Err(err) if attempt < FIND_ATTEMPTS => { warn!( ?err, - "error resolving archive index, purging local cache and retrying once" + %attempt, + "error resolving archive index, purging local cache and retrying" ); self.purge(archive_path, latest_build_id).await?; } - Err(err) => return Err(err), + Err(err) => { + self.metrics.find_calls.add( + 1, + &[ + KeyValue::new("attempt", attempt.to_string()), + KeyValue::new("outcome", "error"), + ], + ); + return Err(err); + } } } @@ -162,7 +581,7 @@ impl Cache { downloader: &impl Downloader, local_index_path: &Path, remote_index_path: &str, - ) -> Result<()> { + ) -> Result { let parent = local_index_path .parent() .ok_or_else(|| anyhow!("index path without parent"))? @@ -171,7 +590,7 @@ impl Cache { // Create a unique temp file in the cache folder. let (temp_file, mut temp_path) = spawn_blocking({ - let folder = self.local_archive_cache_path.clone(); + let folder = self.config.path.clone(); move || -> Result<_> { tempfile::NamedTempFile::new_in(&folder).map_err(Into::into) } }) .await? @@ -183,15 +602,28 @@ impl Cache { .fetch_archive_index(remote_index_path) .await? .content; - io::copy(&mut stream, &mut temp_file).await?; + let copied = io::copy(&mut stream, &mut temp_file).await?; temp_file.flush().await?; - temp_path.disable_cleanup(true); // Publish atomically. // Will replace any existing file. fs::rename(&temp_path, local_index_path).await?; - Ok(()) + temp_path.disable_cleanup(true); + + self.metrics.downloads.add(1, &[]); + self.metrics.downloaded_bytes.add(copied, &[]); + self.metrics.downloaded_entry_size.record(copied, &[]); + + Ok(copied) + } +} + +impl Drop for Cache { + fn drop(&mut self) { + for task in &self.background_tasks { + task.abort(); + } } } @@ -204,7 +636,7 @@ impl FileInfo { } } -/// crates a new empty SQLite database, and returns a configured connection +/// creates a new empty SQLite database, and returns a configured connection /// pool to connect to the DB. /// Any existing DB at the given path will be deleted first. async fn sqlite_create>(path: P) -> Result { @@ -280,7 +712,7 @@ where let start = entry .data_start() - .ok_or_else(|| anyhow!("missing data_start in zip derectory"))?; + .ok_or_else(|| anyhow!("missing data_start in zip directory"))?; let end = start + entry.compressed_size() - 1; let compression_raw = match entry.compression() { zip::CompressionMethod::Bzip2 => compression_bzip, @@ -385,8 +817,10 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{Config, blob::StreamingBlob, types::StorageKind}; + use crate::blob::StreamingBlob; use chrono::Utc; + use docs_rs_config::AppConfig as _; + use docs_rs_opentelemetry::testing::TestMetrics; use sqlx::error::DatabaseError as _; use std::{collections::HashMap, io::Cursor, ops::Deref, pin::Pin, sync::Arc}; use zip::write::SimpleFileOptions; @@ -475,6 +909,61 @@ mod tests { } } + struct FlakyDownloader { + remote_index_path: String, + payload: Vec, + fail_until: usize, + fetch_count: std::sync::Mutex, + } + + impl FlakyDownloader { + fn new(remote_index_path: String, payload: Vec, fail_until: usize) -> Self { + Self { + remote_index_path, + payload, + fail_until, + fetch_count: std::sync::Mutex::new(0), + } + } + + fn fetch_count(&self) -> usize { + *self.fetch_count.lock().unwrap() + } + } + + impl Downloader for FlakyDownloader { + fn fetch_archive_index<'a>( + &'a self, + remote_index_path: &'a str, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + if remote_index_path != self.remote_index_path { + bail!( + "unexpected remote index path: expected {}, got {remote_index_path}", + self.remote_index_path + ); + } + + let mut fetch_count = self.fetch_count.lock().unwrap(); + *fetch_count += 1; + if *fetch_count <= self.fail_until { + bail!("synthetic download failure {fetch_count}"); + } + + let content = self.payload.clone(); + Ok(StreamingBlob { + path: remote_index_path.to_string(), + mime: mime::APPLICATION_OCTET_STREAM, + date_updated: Utc::now(), + etag: None, + compression: None, + content_length: content.len(), + content: Box::new(Cursor::new(content)), + }) + }) + } + } + async fn create_index_bytes(file_count: u32) -> Result> { let tf = create_test_archive(file_count).await?; let tempfile = tempfile::NamedTempFile::new()?.into_temp_path(); @@ -483,8 +972,8 @@ mod tests { } struct TestEnv { - _cache_dir: tempfile::TempDir, - _config: Config, + _collected_metrics: TestMetrics, + config: Arc, cache: Cache, } @@ -496,15 +985,15 @@ mod tests { } } - fn test_cache() -> Result { - let cache_dir = tempfile::tempdir()?; - let mut config = Config::test_config_with_kind(StorageKind::Memory)?; - config.local_archive_cache_path = cache_dir.path().to_path_buf(); - let cache = Cache::new(&config); + async fn test_cache() -> Result { + let config = Arc::new(ArchiveIndexCacheConfig::test_config()?); + let meter_provider = TestMetrics::new(); + let cache = Cache::new_with_backfill(config.clone(), meter_provider.provider()).await?; + Ok(TestEnv { - _cache_dir: cache_dir, - _config: config, + _collected_metrics: meter_provider, cache, + config, }) } @@ -562,7 +1051,7 @@ mod tests { #[tokio::test] async fn outdated_local_archive_index_gets_redownloaded() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(42)); const ARCHIVE_NAME: &str = "test.zip"; @@ -600,7 +1089,7 @@ mod tests { #[tokio::test] async fn find_uses_local_cache_without_downloading() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; const FILE_IN_ARCHIVE: &str = "testfile0"; @@ -624,7 +1113,7 @@ mod tests { #[tokio::test] async fn find_downloads_when_local_cache_missing() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; const FILE_IN_ARCHIVE: &str = "testfile0"; @@ -647,7 +1136,7 @@ mod tests { #[tokio::test] async fn find_returns_none_for_missing_entry() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; @@ -668,7 +1157,7 @@ mod tests { #[tokio::test] async fn find_retries_once_then_errors() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; @@ -694,14 +1183,37 @@ mod tests { .message(), "file is not a database" ); - assert_eq!(downloader.download_count(&remote_index_path), 2); + assert_eq!(downloader.download_count(&remote_index_path), FIND_ATTEMPTS); + + Ok(()) + } + + #[tokio::test] + async fn corrupted_local_index_uses_first_attempt_for_redownload() -> Result<()> { + let cache = test_cache().await?; + const LATEST_BUILD_ID: Option = Some(BuildId(808)); + const ARCHIVE_NAME: &str = "corrupt-first-attempt-redownload.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let cache_file = cache.local_index_path(ARCHIVE_NAME, LATEST_BUILD_ID); + fs::create_dir_all(cache_file.parent().unwrap()).await?; + fs::write(&cache_file, b"not-an-sqlite-index").await?; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let downloader = FlakyDownloader::new(remote_index_path, create_index_bytes(1).await?, 2); + + let result = cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await?; + assert!(result.is_some()); + assert_eq!(downloader.fetch_count(), FIND_ATTEMPTS); Ok(()) } #[tokio::test] async fn purge_removes_index_wal_and_shm() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; @@ -725,16 +1237,230 @@ mod tests { #[tokio::test] async fn purge_is_idempotent_when_files_missing() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; cache.purge("missing.zip", Some(BuildId(7))).await?; cache.purge("missing.zip", Some(BuildId(7))).await?; Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn manager_invalidate_removes_index_wal_and_shm_via_eviction_listener() -> Result<()> { + let cache = test_cache().await?; + let local_index = cache.local_index_path("listener-remove.zip", Some(BuildId(17))); + let wal = local_index.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-wal")); + let shm = local_index.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-shm")); + + fs::create_dir_all(local_index.parent().unwrap()).await?; + fs::write(&local_index, b"index").await?; + fs::write(&wal, b"wal").await?; + fs::write(&shm, b"shm").await?; + + cache + .manager + .insert(local_index.clone(), Arc::new(Entry::from_size(5))) + .await; + + cache.manager.invalidate(&local_index).await; + cache.flush().await?; + // The eviction listener deletes files in a spawned task; + // give it time to complete on the multi-thread runtime. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert!(!fs::try_exists(&local_index).await?); + assert!(!fs::try_exists(&wal).await?); + assert!(!fs::try_exists(&shm).await?); + + Ok(()) + } + + #[tokio::test] + async fn purge_invalidates_manager_so_next_find_redownloads() -> Result<()> { + let cache = test_cache().await?; + const LATEST_BUILD_ID: Option = Some(BuildId(23)); + const ARCHIVE_NAME: &str = "purge-redownload.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.clone(), create_index_bytes(1).await?); + + assert!( + cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert_eq!(downloader.download_count(&remote_index_path), 1); + + cache.purge(ARCHIVE_NAME, LATEST_BUILD_ID).await?; + + assert!( + cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert_eq!(downloader.download_count(&remote_index_path), 2); + + Ok(()) + } + + #[tokio::test] + async fn purge_for_build_id_does_not_invalidate_other_build_id() -> Result<()> { + let cache = test_cache().await?; + const BUILD_ID_A: Option = Some(BuildId(101)); + const BUILD_ID_B: Option = Some(BuildId(202)); + const ARCHIVE_NAME: &str = "build-id-isolation.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let local_a = cache.local_index_path(ARCHIVE_NAME, BUILD_ID_A); + let local_b = cache.local_index_path(ARCHIVE_NAME, BUILD_ID_B); + fs::create_dir_all(local_a.parent().unwrap()).await?; + let index_bytes = create_index_bytes(1).await?; + fs::write(&local_a, &index_bytes).await?; + fs::write(&local_b, &index_bytes).await?; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.clone(), index_bytes.clone()); + + assert!( + cache + .find(ARCHIVE_NAME, BUILD_ID_A, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert!( + cache + .find(ARCHIVE_NAME, BUILD_ID_B, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert_eq!(downloader.download_count(&remote_index_path), 0); + + cache.purge(ARCHIVE_NAME, BUILD_ID_A).await?; + + assert!( + cache + .find(ARCHIVE_NAME, BUILD_ID_A, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert_eq!(downloader.download_count(&remote_index_path), 1); + + assert!( + cache + .find(ARCHIVE_NAME, BUILD_ID_B, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert_eq!(downloader.download_count(&remote_index_path), 1); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn purge_during_inflight_find_does_not_break_recovery() -> Result<()> { + let cache = Arc::new(test_cache().await?); + const LATEST_BUILD_ID: Option = Some(BuildId(303)); + const ARCHIVE_NAME: &str = "inflight-purge.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::with_delay(std::time::Duration::from_millis(150)); + downloader + .indices + .insert(remote_index_path.clone(), create_index_bytes(1).await?); + let downloader = Arc::new(downloader); + + let find_task = { + let cache = cache.clone(); + let downloader = downloader.clone(); + tokio::spawn(async move { + cache + .find( + ARCHIVE_NAME, + LATEST_BUILD_ID, + FILE_IN_ARCHIVE, + downloader.as_ref(), + ) + .await + }) + }; + + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + cache.purge(ARCHIVE_NAME, LATEST_BUILD_ID).await?; + + let result = find_task.await??; + assert!(result.is_some()); + assert!(downloader.download_count(&remote_index_path) <= 2); + + let second = cache + .find( + ARCHIVE_NAME, + LATEST_BUILD_ID, + FILE_IN_ARCHIVE, + downloader.as_ref(), + ) + .await?; + assert!(second.is_some()); + assert!(downloader.download_count(&remote_index_path) <= 2); + + Ok(()) + } + + #[tokio::test] + async fn backfill_then_find_uses_backfilled_entry_without_download_when_file_exists() + -> Result<()> { + let cache = test_cache().await?; + const LATEST_BUILD_ID: Option = Some(BuildId(404)); + const ARCHIVE_NAME: &str = "backfill-preexisting.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let local_index = cache.config.path.join(format!( + "{ARCHIVE_NAME}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", + LATEST_BUILD_ID.unwrap().0 + )); + fs::create_dir_all(local_index.parent().unwrap()).await?; + fs::write(&local_index, create_index_bytes(1).await?).await?; + + cache.backfill().await?; + + assert!(cache.manager.get(&local_index).await.is_some()); + + let downloader = FakeDownloader::new(); + let result = cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await?; + assert!(result.is_some()); + assert_eq!( + downloader.download_count(&format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}")), + 0 + ); + + Ok(()) + } + + #[tokio::test] + async fn backfill_skips_non_index_files() -> Result<()> { + let cache = test_cache().await?; + let non_index_file = cache.config.path.join("not-an-index.tmp"); + fs::create_dir_all(&cache.config.path).await?; + fs::write(&non_index_file, b"junk").await?; + + assert!(cache.manager.get(&non_index_file).await.is_none()); + + Ok(()) + } + #[tokio::test] async fn download_archive_index_overwrites_existing_file() -> Result<()> { - let cache = test_cache()?; + let cache = test_cache().await?; let local_index = cache.local_index_path("test.zip", Some(BuildId(7))); fs::create_dir_all(local_index.parent().unwrap()).await?; fs::write(&local_index, b"old").await?; @@ -758,8 +1484,8 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn concurrent_find_triggers_single_download_per_index() -> Result<()> { - let cache = test_cache()?; - let cache = Arc::new(cache.cache); + let cache = test_cache().await?; + let cache = Arc::new(cache); const N: usize = 16; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; diff --git a/crates/lib/docs_rs_storage/src/config.rs b/crates/lib/docs_rs_storage/src/config.rs index 334300ea6..af2ccdd8b 100644 --- a/crates/lib/docs_rs_storage/src/config.rs +++ b/crates/lib/docs_rs_storage/src/config.rs @@ -4,6 +4,8 @@ use docs_rs_env_vars::{env, maybe_env, require_env}; use std::{ io, path::{self, Path, PathBuf}, + sync::Arc, + time::Duration, }; fn ensure_absolute_path(path: PathBuf) -> io::Result { @@ -14,6 +16,63 @@ fn ensure_absolute_path(path: PathBuf) -> io::Result { } } +#[derive(Debug)] +pub struct ArchiveIndexCacheConfig { + // where do we want to store the locally cached index files + // for the remote archives? + pub path: PathBuf, + + // maximum disk space for the local archive index cache. + pub max_size_mb: u64, + + // TTL for the local index cache + pub ttl: Duration, + + // expected number of entries in the local archive cache. + // Makes server restarts faster by preallocating some data structures. + // General numbers (as of 2025-12): + // * we have ~1.5 mio releases with archive storage (and 400k without) + // * each release has on average 2 archive files (rustdoc, source) + // so, over all, 3 mio archive index files in S3. + // + // While due to crawlers we will download _all_ of them over time, the old + // metric "releases accessed in the last 10 minutes" was around 50k, if I + // recall correctly. + // We use this to pre-allocate the in-memory cache so it can avoid + // resizes during early traffic. + pub expected_count: usize, +} + +impl AppConfig for ArchiveIndexCacheConfig { + fn from_environment() -> anyhow::Result { + let prefix: PathBuf = require_env("DOCSRS_PREFIX")?; + Ok(Self { + path: ensure_absolute_path(env( + "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", + prefix.join("archive_cache"), + )?)?, + max_size_mb: env( + "DOCSRS_ARCHIVE_INDEX_CACHE_MAX_SIZE_MB", + 50 * 1024, // 50 GiB + )?, + ttl: Duration::from_secs(env( + "DOCSRS_ARCHIVE_INDEX_CACHE_TTL", + 24 * 60 * 60, // 24 hours + )?), + expected_count: env("DOCSRS_ARCHIVE_INDEX_EXPECTED_COUNT", 100_000usize)?, + }) + } + + #[cfg(any(feature = "testing", test))] + fn test_config() -> anyhow::Result { + let mut config = Self::from_environment()?; + config.path = + std::env::temp_dir().join(format!("docsrs-test-index-{}", rand::random::())); + + Ok(config) + } +} + #[derive(Debug)] pub struct Config { pub temp_dir: PathBuf, @@ -39,24 +98,8 @@ pub struct Config { pub max_file_size: usize, pub max_file_size_html: usize, - // where do we want to store the locally cached index files - // for the remote archives? - pub local_archive_cache_path: PathBuf, - - // expected number of entries in the local archive cache. - // Makes server restarts faster by preallocating some data structures. - // General numbers (as of 2025-12): - // * we have ~1.5 mio releases with archive storage (and 400k without) - // * each release has on average 2 archive files (rustdoc, source) - // so, over all, 3 mio archive index files in S3. - // - // While due to crawlers we will download _all_ of them over time, the old - // metric "releases accessed in the last 10 minutes" was around 50k, if I - // recall correctly. - // We're using a local DashMap to store some locks for these indexes, - // and we already know in advance we need these 50k entries. - // So we can preallocate the DashMap with this number to avoid resizes. - pub local_archive_cache_expected_count: usize, + // config for the local archive index cache + pub archive_index_cache: Arc, // How much we want to parallelize local filesystem logic. // For pure I/O this could be quite high (32/64), but @@ -76,14 +119,7 @@ impl AppConfig for Config { s3_bucket: env("DOCSRS_S3_BUCKET", "rust-docs-rs".to_string())?, s3_region: env("S3_REGION", "us-west-1".to_string())?, s3_endpoint: maybe_env("S3_ENDPOINT")?, - local_archive_cache_path: ensure_absolute_path(env( - "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", - prefix.join("archive_cache"), - )?)?, - local_archive_cache_expected_count: env( - "DOCSRS_ARCHIVE_INDEX_EXPECTED_COUNT", - 100_000usize, - )?, + archive_index_cache: Arc::new(ArchiveIndexCacheConfig::from_environment()?), max_file_size: env("DOCSRS_MAX_FILE_SIZE", 50 * 1024 * 1024)?, max_file_size_html: env("DOCSRS_MAX_FILE_SIZE_HTML", 50 * 1024 * 1024)?, #[cfg(any(test, feature = "testing"))] @@ -127,8 +163,7 @@ impl Config { let mut config = Self::from_environment()?; config.storage_backend = kind; - config.local_archive_cache_path = - std::env::temp_dir().join(format!("docsrs-test-index-{}", rand::random::())); + config.archive_index_cache = Arc::new(ArchiveIndexCacheConfig::test_config()?); // Use a temporary S3 bucket, only used when storage_kind is set to S3 in env or later. config.s3_bucket = format!("docsrs-test-bucket-{}", rand::random::()); diff --git a/crates/lib/docs_rs_storage/src/metrics.rs b/crates/lib/docs_rs_storage/src/metrics.rs index 644b2be92..53bcd34f6 100644 --- a/crates/lib/docs_rs_storage/src/metrics.rs +++ b/crates/lib/docs_rs_storage/src/metrics.rs @@ -10,6 +10,7 @@ impl StorageMetrics { pub(crate) fn new(meter_provider: &AnyMeterProvider) -> Self { let meter = meter_provider.meter("storage"); const PREFIX: &str = "docsrs.storage"; + Self { uploaded_files: meter .u64_counter(format!("{PREFIX}.uploaded_files")) diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index 7e949d595..8739b54dc 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -15,7 +15,7 @@ use crate::{ storage_path::{rustdoc_archive_path, source_archive_path}, }, }; -use anyhow::Result; +use anyhow::{Context as _, Result}; use docs_rs_mimes::{self as mimes, detect_mime}; use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; @@ -33,15 +33,20 @@ pub struct AsyncStorage { impl AsyncStorage { pub async fn new(config: Arc, otel_meter_provider: &AnyMeterProvider) -> Result { - let otel_metrics = StorageMetrics::new(otel_meter_provider); + let metrics = StorageMetrics::new(otel_meter_provider); Ok(Self { + archive_index_cache: archive_index::Cache::new( + config.archive_index_cache.clone(), + otel_meter_provider, + ) + .await + .context("initialize archive index cache")?, backend: match config.storage_backend { #[cfg(any(test, feature = "testing"))] - StorageKind::Memory => StorageBackend::Memory(MemoryBackend::new(otel_metrics)), - StorageKind::S3 => StorageBackend::S3(S3Backend::new(&config, otel_metrics).await?), + StorageKind::Memory => StorageBackend::Memory(MemoryBackend::new(metrics)), + StorageKind::S3 => StorageBackend::S3(S3Backend::new(&config, metrics).await?), }, - archive_index_cache: archive_index::Cache::new(&config), config, }) } @@ -847,7 +852,8 @@ mod backend_tests { let local_index_location = storage .config - .local_archive_cache_path + .archive_index_cache + .path .join(format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}")); let (stored_files, compression_alg) = storage diff --git a/crates/lib/docs_rs_storage/src/testing/test_env.rs b/crates/lib/docs_rs_storage/src/testing/test_env.rs index 884cc44ad..6380f18bc 100644 --- a/crates/lib/docs_rs_storage/src/testing/test_env.rs +++ b/crates/lib/docs_rs_storage/src/testing/test_env.rs @@ -20,7 +20,6 @@ impl Deref for TestStorage { impl TestStorage { pub async fn from_kind(kind: StorageKind, meter_provider: &AnyMeterProvider) -> Result { - docs_rs_logging::testing::init(); Self::from_config( Arc::new(Config::test_config_with_kind(kind)?), meter_provider, @@ -65,8 +64,8 @@ impl Drop for TestStorage { }); }); - if self.config.local_archive_cache_path.exists() { - std::fs::remove_dir_all(&self.config.local_archive_cache_path).unwrap(); + if self.config.archive_index_cache.path.exists() { + std::fs::remove_dir_all(&self.config.archive_index_cache.path).unwrap(); } } }