Skip to content

track insertion time instead of data time for eviction#1650

Open
ygndotgg wants to merge 2 commits into
parseablehq:mainfrom
ygndotgg:fix/1634-stale-eviction-timestamp
Open

track insertion time instead of data time for eviction#1650
ygndotgg wants to merge 2 commits into
parseablehq:mainfrom
ygndotgg:fix/1634-stale-eviction-timestamp

Conversation

@ygndotgg
Copy link
Copy Markdown

@ygndotgg ygndotgg commented May 24, 2026


Fixes #1634

Description

Goal: Fix silent upload stalls occurring during historical event ingestion with time partitions. The goal is to ensure files are evicted from the active sync tracking list based on how long they have been in the upload queue, rather than how old the data inside them is.

Possible Solutions and Chosen Rationale

The issue stemmed from a conceptual mismatch: data age (when the event occurred) is fundamentally different from tracking age (how long the file has been in the queue). For real-time ingestion, these are nearly identical, so the bug went undetected. For historical ingestion (e.g., Nov 2025 events ingested in May 2026), they diverge by months or years.

The previous solution parsed the data timestamp from the parquet filename via regex and compared it against a 5-minute threshold to determine staleness. The chosen solution changes the tracking structure from a globally locked active-file set to a DashMap<PathBuf, Instant>, recording the exact insertion time for each queued file using a monotonic Instant. This makes cleanup duration-based and timestamp-agnostic. Using Instant specifically guarantees that the 5-minute eviction window is immune to OS clock adjustments or NTP drift.

DashMap was chosen over RwLock<HashMap<...>> because this structure is used as a per-file reservation table. It allows unrelated file paths to be reserved and removed concurrently through internal sharding, avoiding a single global write lock. The reservation logic uses DashMap’s entry API to preserve the “reserve once” invariant and avoid contains_key + insert races.

Key Changes Made in the Patch

src/sync.rs

  • Changed ACTIVE_OBJECT_STORE_SYNC_FILES from a globally locked set/map to DashMap<PathBuf, Instant>.
  • Removed the extra tracking struct and wall-clock timestamp because stale eviction only needs monotonic insertion time.

src/storage/object_storage.rs

  • Updated process_parquet_files() to reserve paths using DashMap’s entry API and record Instant::now() when a path enters active sync tracking.
  • Updated upload failure and validation failure cleanup to remove paths directly from DashMap.
  • Updated successful upload cleanup to remove uploaded paths directly from DashMap.
  • Updated stale eviction to compare elapsed time from the stored Instant instead of regex-parsing the data timestamp from the filename.

This PR has:

  • been tested to ensure log ingestion and log query works.
  • added documentation for new or modified features or behaviors.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.

Summary by CodeRabbit

  • Refactor
    • Enhanced tracking of active sync files with improved timestamp management for more reliable cleanup of stale entries.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 24, 2026

Walkthrough

Swap HashSet+locks for a global DashMap<PathBuf, Instant>. Insert per-path Instant::now() via entry() when staging parquet files, skip already-tracked paths, remove entries on upload failure or error, and evict stale entries by checking Instant::now().duration_since(tracked_instant) < 300s.

Changes

Active sync tracking refactor

Layer / File(s) Summary
Tracking static and imports
src/sync.rs, src/storage/object_storage.rs
Replace Arc<RwLock<HashSet<PathBuf>>> with Lazy<DashMap<PathBuf, std::time::Instant>>, add dashmap imports, and relocate import sites used by staging logic.
Staging insertion using DashMap::entry
src/storage/object_storage.rs
Use ACTIVE_OBJECT_STORE_SYNC_FILES.entry(path) to insert Instant::now() for vacant parquet paths and only queue newly-inserted paths for upload.
Remove map entries on upload failure/error
src/storage/object_storage.rs
Remove the staged path from ACTIVE_OBJECT_STORE_SYNC_FILES when uploads produce no manifest (size validation failure) or when the upload task returns an error.
Monotonic-duration-based eviction
src/storage/object_storage.rs
Evict stale tracked entries by comparing the stored Instant to Instant::now() and retaining entries with elapsed < 300s instead of parsing timestamps from filenames.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • parmesant
  • nikhilsinhaparseable

Poem

🐰 I swapped a filename tick for one that won't lag,
Instant.now() keeps each path in my bag.
When uploads fail or time runs thin,
I clean the map and let new jobs begin.
Hooray — no stale race, just steady hop and wag!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is comprehensive and well-structured, covering the goal, rationale, and key changes. However, the author explicitly marked two required checklist items as incomplete: adding documentation and adding code comments. Add documentation for the DashMap-based tracking structure and include inline comments explaining the monotonic Instant-based eviction logic and race condition prevention in the entry API usage.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed Title clearly and concisely describes the core change: switching from data timestamp-based to insertion time-based eviction, which directly addresses the main issue.
Linked Issues check ✅ Passed PR fully implements issue #1634: replaces filename-based eviction with duration-based tracking using insertion timestamps, changes HashSet to HashMap/DashMap, and uses monotonic Instant for safe eviction math.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing #1634: converting tracking from HashSet to DashMap and replacing data-timestamp with insertion-time eviction logic.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@nitisht nitisht requested a review from parmesant May 24, 2026 12:46
coderabbitai[bot]
coderabbitai Bot previously approved these changes May 24, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)

1189-1209: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Drain the batch before returning the first upload error.

Line 1192 exits before the cleanup at Lines 1201-1209. If one upload fails after earlier uploads already succeeded, those completed paths stay registered in ACTIVE_OBJECT_STORE_SYNC_FILES and get skipped on the next sync until a later eviction pass clears them. That reintroduces the same “stuck active file” behavior this PR is trying to remove.

Suggested fix
 async fn collect_upload_results(
     mut join_set: JoinSet<Result<UploadResult, (PathBuf, ObjectStorageError)>>,
 ) -> Result<Vec<catalog::manifest::File>, ObjectStorageError> {
     let mut uploaded_files = Vec::new();
+    let mut first_error = None;
 
     while let Some(result) = join_set.join_next().await {
         match result {
             Ok(Ok(upload_result)) => {
                 if let Some(manifest_file) = upload_result.manifest_file {
@@
             }
             Ok(Err((path, e))) => {
                 error!("Error uploading parquet file: {e}");
                 ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&path);
-                return Err(e);
+                if first_error.is_none() {
+                    first_error = Some(e);
+                }
             }
             Err(e) => {
                 error!("Task panicked: {e}");
-                return Err(ObjectStorageError::UnhandledError(Box::new(e)));
+                if first_error.is_none() {
+                    first_error = Some(ObjectStorageError::UnhandledError(Box::new(e)));
+                }
             }
         }
     }
 
-    // successfully uploaded files, remove from DashMap
-    for (path, _) in uploaded_files.iter() {
+    for (path, _) in &uploaded_files {
         ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path);
     }
-    // Use monotonic time to ensure the 5-minute eviction window(cleanup) is immune to system clock adjustments.
+
     let now = Instant::now();
     ACTIVE_OBJECT_STORE_SYNC_FILES.retain(|_, tracked_instant| {
         now.duration_since(*tracked_instant) < Duration::from_secs(300)
     });
+
+    if let Some(err) = first_error {
+        return Err(err);
+    }
+
     let manifest_files: Vec<_> = uploaded_files
         .into_par_iter()
         .map(|(path, manifest_file)| {
             if let Err(e) = remove_file(&path) {
                 warn!("Failed to remove staged file: {e}");
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/object_storage.rs` around lines 1189 - 1209, The Ok(Err((path,
e))) branch returns early without removing already-successful paths from
ACTIVE_OBJECT_STORE_SYNC_FILES, leaving them stuck; modify the error paths (both
the Ok(Err((path, e))) match arm and the Err(e) panic arm) to first iterate
uploaded_files and remove each path from ACTIVE_OBJECT_STORE_SYNC_FILES (the
same loop used later: for (path, _) in uploaded_files.iter() {
ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); }), perform the monotonic-time
retain cleanup, and only then return the error (Err(e) or
Err(ObjectStorageError::UnhandledError(...))). Ensure you reference the
uploaded_files variable and ACTIVE_OBJECT_STORE_SYNC_FILES so the cleanup always
runs before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1189-1209: The Ok(Err((path, e))) branch returns early without
removing already-successful paths from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving
them stuck; modify the error paths (both the Ok(Err((path, e))) match arm and
the Err(e) panic arm) to first iterate uploaded_files and remove each path from
ACTIVE_OBJECT_STORE_SYNC_FILES (the same loop used later: for (path, _) in
uploaded_files.iter() { ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); }), perform
the monotonic-time retain cleanup, and only then return the error (Err(e) or
Err(ObjectStorageError::UnhandledError(...))). Ensure you reference the
uploaded_files variable and ACTIVE_OBJECT_STORE_SYNC_FILES so the cleanup always
runs before returning.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 07b52d33-f64f-4959-b2c6-e16b530a9b3b

📥 Commits

Reviewing files that changed from the base of the PR and between f6993c2 and 7febfd4.

📒 Files selected for processing (2)
  • src/storage/object_storage.rs
  • src/sync.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

improvement: Use tracking timestamps instead of data timestamps for stale-entry eviction in ACTIVE_OBJECT_STORE_SYNC_FILES

1 participant