From 93a5b88cbbcb682b28972cfa0da0c0d18f0581c4 Mon Sep 17 00:00:00 2001 From: Kyle Petryszak <6314611+ProjectInitiative@users.noreply.github.com> Date: Tue, 10 Mar 2026 23:32:13 -0500 Subject: [PATCH 1/2] feat: implement in-flight tracking and debouncing to prevent redundant uploads --- Cargo.lock | 15 ++++ Cargo.toml | 1 + nixos/tests/integration.nix | 56 ++++++++----- src/main.rs | 33 ++++---- src/nix_store_watcher.rs | 153 +++++++++++++++++++++++++++++------- 5 files changed, 193 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c63ccd6..bc6a96b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1214,6 +1214,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.6.1" @@ -2269,6 +2283,7 @@ dependencies = [ "chrono", "clap", "console-subscriber", + "dashmap", "displaydoc", "ed25519-compact", "futures", diff --git a/Cargo.toml b/Cargo.toml index 65b3fd1..2ab9f01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ tokio = { version = "1.38.0", features = ["full"] } toml = "0.8.14" tracing = "0.1.40" tracing-subscriber = { version = "0.3.20", features = ["fmt", "registry", "env-filter"] } +dashmap = "6.1.0" attic = { git = "https://github.com/zhaofengli/attic", branch = "main" } displaydoc = "0.2.5" itoa = "1.0.11" diff --git a/nixos/tests/integration.nix b/nixos/tests/integration.nix index 080ce0c..d766712 100644 --- a/nixos/tests/integration.nix +++ b/nixos/tests/integration.nix @@ -49,26 +49,15 @@ jq garage loft + coreutils ]; nix.settings = { experimental-features = [ "nix-command" "flakes" ]; trusted-users = [ "root" ]; substituters = []; + sandbox = false; }; - - # Ensure we have a dummy nixpkgs available for nix-build - environment.variables.NIX_PATH = lib.mkForce "nixpkgs=/etc/nixpkgs"; - environment.etc."nixpkgs/default.nix".text = '' - { ... }: { - runCommand = name: env: script: derivation { - inherit name script; - builder = "/bin/sh"; - args = [ "-c" script ]; - system = "${pkgs.system}"; - }; - } - ''; networking.firewall.allowedTCPPorts = [ 3900 3901 ]; }; @@ -140,17 +129,46 @@ machine.wait_for_unit("loft.service", timeout=30) machine.log("Stress testing with 2 paths...") - p1 = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"p1\" {} \"echo 1 > $out\"' --impure").strip() - p2 = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"p2\" {} \"echo 2 > $out\"' --impure").strip() + p1 = machine.succeed("nix build --no-link --print-out-paths --expr 'derivation { name = \"p1\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo 1 > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }' --impure").strip() + p2 = machine.succeed("nix build --no-link --print-out-paths --expr 'derivation { name = \"p2\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo 2 > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }' --impure").strip() verify_cache(p1, s3_url, timeout=60) verify_cache(p2, s3_url, timeout=60) + with subtest("Concurrency: Deduplication of shared dependencies"): + reset_state() + machine.succeed("systemctl start loft.service") + machine.wait_for_unit("loft.service", timeout=30) + + # Create a shared dependency + dep = machine.succeed("nix build --no-link --print-out-paths --expr 'derivation { name = \"shared-dep\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo shared > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }' --impure").strip() + dep_hash = dep.split("/")[-1].split("-")[0] + + # Create two paths that depend on it + machine.log(f"Building p-alpha and p-beta sharing {dep_hash}") + nix_expr = "let dep = \"" + dep + "\"; in [ (derivation { name = \"p-alpha\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo a > $out; cat ''${dep} > /dev/null\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }) (derivation { name = \"p-beta\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo b > $out; cat ''${dep} > /dev/null\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }) ]" + machine.succeed(f"nix build --no-link --expr '{nix_expr}' --impure") + + # Wait for them to show up in S3 + machine.wait_until_succeeds(with_s3(f"aws --endpoint-url http://localhost:3900 s3 ls s3://loft-test-bucket/{dep_hash}.narinfo"), timeout=60) + + # Check logs for duplicate uploads of the shared dependency + logs = machine.succeed("journalctl -u loft.service") + # We look for the "Initiating streaming multipart upload" message which happens once per upload attempt + upload_count = logs.count(f"Initiating streaming multipart upload for 'nar/{dep_hash}") + + if upload_count > 1: + # Note: It might be 0 if it was already in S3, but reset_state() clears S3. + # It should be exactly 1. + raise Exception(f"Shared dependency {dep_hash} was uploaded {upload_count} times! Expected 1.") + + machine.log(f"Verified: {dep_hash} was uploaded {upload_count} time(s).") + with subtest("Config: skipSignedByKeys rejection"): reset_state() machine.succeed("systemctl start loft.service") machine.succeed("nix-store --generate-binary-cache-key test-exclude-key-1 /tmp/sk1 /tmp/pk1") - p_to_sign = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"signed-path\" {} \"echo signed > $out\"' --impure").strip() + p_to_sign = machine.succeed("nix build --no-link --print-out-paths --expr 'derivation { name = \"signed-path\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo signed > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }' --impure").strip() machine.succeed("nix store sign --key-file /tmp/sk1 " + p_to_sign) hash_part = p_to_sign.split("/")[-1].split("-")[0] @@ -172,7 +190,7 @@ with subtest("Service: Pruning verification"): reset_state() machine.succeed("systemctl start loft.service") - p = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"pp\" {} \"echo prune > $out\"' --impure").strip() + p = machine.succeed("nix build --no-link --print-out-paths --expr 'derivation { name = \"pp\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo prune > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }' --impure").strip() verify_cache(p, s3_url) machine.succeed("systemctl stop loft.service") @@ -192,7 +210,7 @@ machine.log("Creating pre-existing paths...") pre_paths = [] for i in range(3): - p = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"pre-" + str(i) + "\" {} \"echo " + str(i) + " > $out\"' --impure").strip() + p = machine.succeed(f"nix build --no-link --print-out-paths --expr 'derivation {{ name = \"pre-{i}\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo {i} > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }}' --impure").strip() pre_paths.append(p) secure_copy(manual_config_content, manual_config) @@ -209,7 +227,7 @@ machine.succeed("systemctl stop loft.service") bulk_paths = [] for i in range(5): - p = machine.succeed("nix build --no-link --print-out-paths --expr '(import {}).runCommand \"bulk-" + str(i) + "\" {} \"echo " + str(i) + " > $out\"' --impure").strip() + p = machine.succeed(f"nix build --no-link --print-out-paths --expr 'derivation {{ name = \"bulk-{i}\"; builder = \"/bin/sh\"; args = [ \"-c\" \"echo {i} > $out\" ]; system = \"${pkgs.system}\"; PATH = \"/run/current-system/sw/bin\"; }}' --impure").strip() bulk_paths.append(p) secure_copy(manual_config_content, manual_config) diff --git a/src/main.rs b/src/main.rs index 4eac93b..0e14bd9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use loft::{config, local_cache, nix_store_watcher, pruner, s3_uploader}; use config::Config; use local_cache::LocalCache; -use nix_store_watcher::process_path; // Import process_path +use nix_store_watcher::process_paths; // Import process_paths use pruner::Pruner; /// Command-line arguments for Loft. @@ -128,25 +128,24 @@ async fn main() -> Result<()> { // Handle manual path uploads if let Some(paths_to_upload) = args.upload_path { - info!("Manually uploading specified paths..."); + info!("Manually uploading {} specified paths...", paths_to_upload.len()); let local_cache_clone = local_cache.clone(); let uploader_clone = uploader.clone(); let config_clone = config.clone(); - - for path in paths_to_upload { - info!("Processing manual upload path: {}", path.display()); - if let Err(e) = process_path( - uploader_clone.clone(), - local_cache_clone.clone(), - &path, - &config_clone, - true, // Force scan for manual uploads - args.dry_run, - ) - .await - { - error!("Failed to manually upload '{}': {:?}", path.display(), e); - } + let in_flight = Arc::new(dashmap::DashMap::new()); + + if let Err(e) = process_paths( + uploader_clone, + local_cache_clone, + paths_to_upload, + &config_clone, + true, // Force scan for manual uploads + args.dry_run, + in_flight, + ) + .await + { + error!("Failed to manually upload paths: {:?}", e); } info!("Finished manual uploads."); return Ok(()); // Exit after manual uploads diff --git a/src/nix_store_watcher.rs b/src/nix_store_watcher.rs index 6ef7c46..8acea4b 100644 --- a/src/nix_store_watcher.rs +++ b/src/nix_store_watcher.rs @@ -1,13 +1,16 @@ //! Watches the Nix store for changes and triggers uploads. use anyhow::Result; +use dashmap::DashMap; use futures::future::join_all; use notify::{Error as NotifyError, Event, RecursiveMode, Watcher}; use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::{mpsc, Semaphore}; -use tracing::{error, info}; +use tracing::{debug, error, info}; + +type InFlightRegistry = Arc>; use crate::cache_checker::CacheChecker; use crate::config::Config; @@ -180,6 +183,7 @@ pub async fn watch_store( let (tx, mut rx) = mpsc::channel(100); let semaphore = Arc::new(Semaphore::new(config.loft.upload_threads)); let mut join_set = tokio::task::JoinSet::new(); + let in_flight: InFlightRegistry = Arc::new(DashMap::new()); let mut watcher = notify::recommended_watcher(move |res: Result| { if let Ok(event) = res { @@ -211,23 +215,48 @@ pub async fn watch_store( } path_opt = rx.recv() => { if let Some(path) = path_opt { + let mut paths_batch = vec![path]; + + // Debounce: Wait a bit to see if more paths arrive + let debounce_duration = std::time::Duration::from_millis(500); + let mut interval = tokio::time::interval(debounce_duration); + interval.tick().await; // First tick is immediate + + loop { + tokio::select! { + more_path = rx.recv() => { + if let Some(p) = more_path { + paths_batch.push(p); + } else { + break; + } + } + _ = interval.tick() => { + break; + } + } + } + let uploader_clone = uploader.clone(); let local_cache_clone = local_cache.clone(); let semaphore_clone = semaphore.clone(); - let config_for_task = config.clone(); // Clone config for the spawned task + let config_for_task = config.clone(); + let in_flight_clone = in_flight.clone(); + join_set.spawn(async move { let permit = semaphore_clone.acquire_owned().await.unwrap(); - if let Err(e) = process_path( + if let Err(e) = process_paths( uploader_clone, local_cache_clone, - &path, + paths_batch, &config_for_task, force_scan, dry_run, + in_flight_clone, ) .await { - error!("Failed to process '{}': {:?}", path.display(), e); + error!("Failed to process batch: {:?}", e); } drop(permit); }); @@ -244,70 +273,131 @@ pub async fn watch_store( Ok(()) } -/// Processes a single store path for upload. -pub async fn process_path( +/// Processes a batch of store paths for upload. +pub async fn process_paths( uploader: Arc, local_cache: Arc, - path: &Path, + paths: Vec, config: &Config, force_scan: bool, dry_run: bool, + in_flight: InFlightRegistry, ) -> Result<()> { + if paths.is_empty() { + return Ok(()); + } + // Add a small delay to allow for follow-up operations like signing tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let path_str = path.to_str().unwrap_or("").to_string(); + let path_strs: Vec = paths + .iter() + .filter_map(|p| p.to_str().map(|s| s.to_string())) + .collect(); + + // 1. Initial filter against in-flight registry + let filtered_input: Vec = path_strs + .into_iter() + .filter(|p| { + if in_flight.contains_key(p) { + debug!("Path {} is already in flight. Skipping.", p); + false + } else { + true + } + }) + .collect(); + + if filtered_input.is_empty() { + return Ok(()); + } + + info!("Processing batch of {} store paths", filtered_input.len()); - // 1. Filter the path + // 2. Filter input paths by signatures let keys_to_skip = config.loft.skip_signed_by_keys.clone().unwrap_or_default(); - let sigs_map = nix::get_path_signatures_bulk(std::slice::from_ref(&path_str)).await?; - let filtered_paths = nix::filter_out_sig_keys(sigs_map, keys_to_skip.clone()).await?; + let sigs_map = nix::get_path_signatures_bulk(&filtered_input).await?; + let filtered_input_map = nix::filter_out_sig_keys(sigs_map, keys_to_skip.clone()).await?; + let filtered_input_vec: Vec = filtered_input_map.keys().cloned().collect(); - if filtered_paths.is_empty() { - info!("Skipping path: {}", path.display()); + if filtered_input_vec.is_empty() { + info!("No paths in batch passed signature filtering."); return Ok(()); } - // 2. Get closure - let closure = nix::get_store_path_closure(&path_str).await?; + // 3. Get closure for the entire batch + let closure_set: HashSet = nix::get_store_paths_closure(&filtered_input_vec) + .await? + .into_iter() + .collect(); + + // 4. Filter closure against in-flight registry + let filtered_closure: Vec = closure_set + .into_iter() + .filter(|p| { + if in_flight.contains_key(p) { + debug!("Closure path {} is already in flight. Skipping.", p); + false + } else { + true + } + }) + .collect(); + + if filtered_closure.is_empty() { + debug!("Entire closure for batch is already in flight or empty."); + return Ok(()); + } - // 3. Check caches BEFORE fetching signatures + // 5. Check caches BEFORE fetching signatures let nix_store = Arc::new(NixStore::connect()?); let checker = CacheChecker::new(uploader.clone(), local_cache.clone(), config.clone()); let mut result = checker - .check_paths(nix_store.as_ref(), &closure, force_scan) + .check_paths(nix_store.as_ref(), &filtered_closure, force_scan) .await?; if result.to_upload.is_empty() { - info!("No missing paths to upload for {}.", path.display()); + info!("No missing paths to upload for batch."); return Ok(()); } - // 4. Get signatures for only the MISSING closure paths and filter again + // 6. Get signatures for only the MISSING closure paths and filter again let closure_signatures = nix::get_path_signatures_bulk(&result.to_upload).await?; let filtered_closure_paths = nix::filter_out_sig_keys(closure_signatures, keys_to_skip).await?; let filtered_closure_vec: Vec = filtered_closure_paths.keys().cloned().collect(); + info!( - "Total paths after filtering missing closure for {}: {}", - path.display(), + "Total paths missing from cache after filtering: {}", filtered_closure_vec.len() ); result.to_upload = filtered_closure_vec; if dry_run { - info!("DRY RUN: The following {} paths would be uploaded for {}:", result.to_upload.len(), path.display()); + info!("DRY RUN: The following {} paths would be uploaded:", result.to_upload.len()); for p in result.to_upload { info!(" DRY RUN: Would upload {}", p); } return Ok(()); } - // 5. Upload missing + // 7. Register all paths about to be uploaded as in-flight + let mut actual_in_flight = Vec::new(); + for p in &result.to_upload { + if in_flight.insert(p.clone(), ()).is_none() { + actual_in_flight.push(p.clone()); + } + } + + if actual_in_flight.is_empty() { + return Ok(()); + } + + // 8. Upload missing let semaphore = Arc::new(Semaphore::new(config.loft.upload_threads)); let mut tasks = Vec::new(); - for path_str in result.to_upload { + for path_str in &actual_in_flight { let uploader_clone = uploader.clone(); let config_clone = config.clone(); let semaphore_clone = semaphore.clone(); @@ -338,21 +428,26 @@ pub async fn process_path( .filter_map(|res| res.ok().flatten()) .collect(); + // 9. Unregister in-flight paths + for p in &actual_in_flight { + in_flight.remove(p); + } + if !uploaded_hashes.is_empty() { if let Err(e) = local_cache.add_many_path_hashes(&uploaded_hashes) { error!( - "Failed to batch add paths to local cache for {}: {:?}", - path.display(), + "Failed to batch add paths to local cache: {:?}", e ); } else { info!( - "Successfully added {} paths to local cache for {}.", + "Successfully added {} paths to local cache.", uploaded_hashes.len(), - path.display() ); } } Ok(()) } + + From 7d23b0e5edc7019e65e1cedc6c21af8f0a93837b Mon Sep 17 00:00:00 2001 From: Kyle Petryszak <6314611+ProjectInitiative@users.noreply.github.com> Date: Tue, 10 Mar 2026 23:34:20 -0500 Subject: [PATCH 2/2] chore: bump version to 0.3.2 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc6a96b..7acb800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2270,7 +2270,7 @@ dependencies = [ [[package]] name = "loft" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "async-compression", diff --git a/Cargo.toml b/Cargo.toml index 2ab9f01..832de8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "loft" -version = "0.3.1" +version = "0.3.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html