track insertion time instead of data time for eviction#1650
Conversation
WalkthroughSwap HashSet+locks for a global ChangesActive sync tracking refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)
1189-1209:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDrain the batch before returning the first upload error.
Line 1192 exits before the cleanup at Lines 1201-1209. If one upload fails after earlier uploads already succeeded, those completed paths stay registered in
ACTIVE_OBJECT_STORE_SYNC_FILESand get skipped on the next sync until a later eviction pass clears them. That reintroduces the same “stuck active file” behavior this PR is trying to remove.Suggested fix
async fn collect_upload_results( mut join_set: JoinSet<Result<UploadResult, (PathBuf, ObjectStorageError)>>, ) -> Result<Vec<catalog::manifest::File>, ObjectStorageError> { let mut uploaded_files = Vec::new(); + let mut first_error = None; while let Some(result) = join_set.join_next().await { match result { Ok(Ok(upload_result)) => { if let Some(manifest_file) = upload_result.manifest_file { @@ } Ok(Err((path, e))) => { error!("Error uploading parquet file: {e}"); ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&path); - return Err(e); + if first_error.is_none() { + first_error = Some(e); + } } Err(e) => { error!("Task panicked: {e}"); - return Err(ObjectStorageError::UnhandledError(Box::new(e))); + if first_error.is_none() { + first_error = Some(ObjectStorageError::UnhandledError(Box::new(e))); + } } } } - // successfully uploaded files, remove from DashMap - for (path, _) in uploaded_files.iter() { + for (path, _) in &uploaded_files { ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); } - // Use monotonic time to ensure the 5-minute eviction window(cleanup) is immune to system clock adjustments. + let now = Instant::now(); ACTIVE_OBJECT_STORE_SYNC_FILES.retain(|_, tracked_instant| { now.duration_since(*tracked_instant) < Duration::from_secs(300) }); + + if let Some(err) = first_error { + return Err(err); + } + let manifest_files: Vec<_> = uploaded_files .into_par_iter() .map(|(path, manifest_file)| { if let Err(e) = remove_file(&path) { warn!("Failed to remove staged file: {e}");🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/storage/object_storage.rs` around lines 1189 - 1209, The Ok(Err((path, e))) branch returns early without removing already-successful paths from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving them stuck; modify the error paths (both the Ok(Err((path, e))) match arm and the Err(e) panic arm) to first iterate uploaded_files and remove each path from ACTIVE_OBJECT_STORE_SYNC_FILES (the same loop used later: for (path, _) in uploaded_files.iter() { ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); }), perform the monotonic-time retain cleanup, and only then return the error (Err(e) or Err(ObjectStorageError::UnhandledError(...))). Ensure you reference the uploaded_files variable and ACTIVE_OBJECT_STORE_SYNC_FILES so the cleanup always runs before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1189-1209: The Ok(Err((path, e))) branch returns early without
removing already-successful paths from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving
them stuck; modify the error paths (both the Ok(Err((path, e))) match arm and
the Err(e) panic arm) to first iterate uploaded_files and remove each path from
ACTIVE_OBJECT_STORE_SYNC_FILES (the same loop used later: for (path, _) in
uploaded_files.iter() { ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); }), perform
the monotonic-time retain cleanup, and only then return the error (Err(e) or
Err(ObjectStorageError::UnhandledError(...))). Ensure you reference the
uploaded_files variable and ACTIVE_OBJECT_STORE_SYNC_FILES so the cleanup always
runs before returning.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 07b52d33-f64f-4959-b2c6-e16b530a9b3b
📒 Files selected for processing (2)
src/storage/object_storage.rssrc/sync.rs
Fixes #1634
Description
Goal: Fix silent upload stalls occurring during historical event ingestion with time partitions. The goal is to ensure files are evicted from the active sync tracking list based on how long they have been in the upload queue, rather than how old the data inside them is.
Possible Solutions and Chosen Rationale
The issue stemmed from a conceptual mismatch: data age (when the event occurred) is fundamentally different from tracking age (how long the file has been in the queue). For real-time ingestion, these are nearly identical, so the bug went undetected. For historical ingestion (e.g., Nov 2025 events ingested in May 2026), they diverge by months or years.
The previous solution parsed the data timestamp from the parquet filename via regex and compared it against a 5-minute threshold to determine staleness. The chosen solution changes the tracking structure from a globally locked active-file set to a
DashMap<PathBuf, Instant>, recording the exact insertion time for each queued file using a monotonicInstant. This makes cleanup duration-based and timestamp-agnostic. UsingInstantspecifically guarantees that the 5-minute eviction window is immune to OS clock adjustments or NTP drift.DashMapwas chosen overRwLock<HashMap<...>>because this structure is used as a per-file reservation table. It allows unrelated file paths to be reserved and removed concurrently through internal sharding, avoiding a single global write lock. The reservation logic uses DashMap’s entry API to preserve the “reserve once” invariant and avoidcontains_key+insertraces.Key Changes Made in the Patch
src/sync.rsACTIVE_OBJECT_STORE_SYNC_FILESfrom a globally locked set/map toDashMap<PathBuf, Instant>.src/storage/object_storage.rsprocess_parquet_files()to reserve paths using DashMap’s entry API and recordInstant::now()when a path enters active sync tracking.Instantinstead of regex-parsing the data timestamp from the filename.This PR has:
Summary by CodeRabbit