From f6993c2f68d9d807b850b11a3414a23259178f49 Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Sun, 24 May 2026 17:52:56 +0530 Subject: [PATCH 1/2] track insertion time instead of data time for eviction --- src/storage/object_storage.rs | 36 +++++++++++++++++++++++++++-------- src/sync.rs | 16 ++++++++++++---- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b67cf18f9..f10743693 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -63,7 +63,7 @@ use crate::storage::TARGETS_ROOT_DIRECTORY; use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; use crate::storage::field_stats::calculate_field_stats; use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; -use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES; +use crate::sync::{ACTIVE_OBJECT_STORE_SYNC_FILES, SyncFileEntry}; use super::{ ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, @@ -1078,12 +1078,20 @@ async fn process_parquet_files( .stream .parquet_files() .into_par_iter() - .filter(|p| !guard.contains(p)) + .filter(|p| !guard.contains_key(p)) .collect(); let mut ret = Vec::with_capacity(parquet_paths.len()); ret.clone_from(&parquet_paths); - guard.extend(parquet_paths); + for path in parquet_paths { + guard.insert( + path, + SyncFileEntry { + tracked_instant: Instant::now(), + tracked_utc: Utc::now(), + }, + ); + } ret }; @@ -1212,11 +1220,23 @@ async fn collect_upload_results( guard.remove(path); } - // check if file has been in hashset for more than 5 minutes - let now = Utc::now(); - guard.retain(|f| { - !extract_datetime_from_parquet_path_regex(f) - .is_ok_and(|ts| (now - ts).num_minutes() >= 5) + // Use monotonic time to ensure the 5-minute eviction window is immune to system clock adjustments. + let now = Instant::now(); + guard.retain(|path, entry| { + let elapsed = now.duration_since(entry.tracked_instant); + // 5 min eviction window + if elapsed >= Duration::from_secs(300) { + // Added log for observability and debugging + warn!( + "Removing stale sync file: {} (elapsed: {}s, tracked_at: {})", + path.display(), + elapsed.as_secs(), + entry.tracked_utc.to_rfc3339(), + ); + false + } else { + true + } }); } let manifest_files: Vec<_> = uploaded_files diff --git a/src/sync.rs b/src/sync.rs index 14a88ab38..e0ae0aa67 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -16,8 +16,7 @@ * */ -use chrono::{TimeDelta, Timelike}; -use datafusion::common::HashSet; +use chrono::{DateTime, TimeDelta, Timelike, Utc}; use futures::FutureExt; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -35,8 +34,17 @@ use tracing::{Instrument, error, info, info_span, trace, warn}; static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); -pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(HashSet::new()))); +// tracks metadata for files being synced to object storage +#[derive(Clone, Debug)] +pub struct SyncFileEntry { + // Monotonic instant when file was added to tracking + pub tracked_instant: std::time::Instant, + // Wall-clock UTC timestamp for observability + pub tracked_utc: DateTime, +} + +pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy>>> = + Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); /// RAII guard that clears a sync-running flag on drop, so a panic inside the /// sync body cannot leave the flag stuck at `true` and wedge future ticks. struct SyncRunningGuard(&'static AtomicBool); From 7febfd4aa937d10da8dc2041a7eb05bff75f1490 Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Mon, 25 May 2026 13:41:48 +0530 Subject: [PATCH 2/2] Use DashMap for concurrent cache access --- src/storage/object_storage.rs | 124 +++++++++++++--------------------- src/sync.rs | 19 ++---- 2 files changed, 51 insertions(+), 92 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index f10743693..b2d491ef5 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,10 +16,32 @@ * */ +use crate::catalog::{self, snapshot::Snapshot}; +use crate::event::format::LogSource; +use crate::event::format::LogSourceEntry; +use crate::handlers::DatasetTag; +use crate::handlers::http::fetch_schema; +use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; +use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; +use crate::metrics::increment_parquets_stored_by_date; +use crate::metrics::increment_parquets_stored_size_by_date; +use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; +use crate::option::Mode; +use crate::parseable::DEFAULT_TENANT; +use crate::parseable::{LogStream, PARSEABLE, Stream}; +use crate::stats::FullStats; +use crate::storage::SETTINGS_ROOT_DIRECTORY; +use crate::storage::TARGETS_ROOT_DIRECTORY; +use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; +use crate::storage::field_stats::calculate_field_stats; +use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; +use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use dashmap::mapref::entry::Entry; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use itertools::Itertools; use object_store::ListResult; @@ -43,28 +65,6 @@ use tokio::task::JoinSet; use tracing::{Instrument, error, info, info_span, warn}; use ulid::Ulid; -use crate::catalog::{self, snapshot::Snapshot}; -use crate::event::format::LogSource; -use crate::event::format::LogSourceEntry; -use crate::handlers::DatasetTag; -use crate::handlers::http::fetch_schema; -use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; -use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; -use crate::metrics::increment_parquets_stored_by_date; -use crate::metrics::increment_parquets_stored_size_by_date; -use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; -use crate::option::Mode; -use crate::parseable::DEFAULT_TENANT; -use crate::parseable::{LogStream, PARSEABLE, Stream}; -use crate::stats::FullStats; -use crate::storage::SETTINGS_ROOT_DIRECTORY; -use crate::storage::TARGETS_ROOT_DIRECTORY; -use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; -use crate::storage::field_stats::calculate_field_stats; -use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; -use crate::sync::{ACTIVE_OBJECT_STORE_SYNC_FILES, SyncFileEntry}; - use super::{ ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, @@ -1071,29 +1071,20 @@ async fn process_parquet_files( let object_store = PARSEABLE.storage().get_object_store(); // collect all parquet files to upload - let parquet_paths = { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - - let parquet_paths: Vec = upload_context - .stream - .parquet_files() - .into_par_iter() - .filter(|p| !guard.contains_key(p)) - .collect(); - - let mut ret = Vec::with_capacity(parquet_paths.len()); - ret.clone_from(&parquet_paths); - for path in parquet_paths { - guard.insert( - path, - SyncFileEntry { - tracked_instant: Instant::now(), - tracked_utc: Utc::now(), - }, - ); - } - ret - }; + let parquet_paths: Vec = upload_context + .stream + .parquet_files() + .into_par_iter() + .filter_map( + |path| match ACTIVE_OBJECT_STORE_SYNC_FILES.entry(path.clone()) { + Entry::Vacant(entry) => { + entry.insert(Instant::now()); + Some(path) + } + Entry::Occupied(_) => None, + }, + ) + .collect(); let mut total_size: u64 = 0; let mut min_dt: Option> = None; @@ -1192,18 +1183,12 @@ async fn collect_upload_results( "Parquet file upload size validation failed for {:?}, preserving in staging for retry", upload_result.file_path ); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&upload_result.file_path); - } + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&upload_result.file_path); } } Ok(Err((path, e))) => { error!("Error uploading parquet file: {e}"); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&path); - } + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&path); return Err(e); } Err(e) => { @@ -1213,32 +1198,15 @@ async fn collect_upload_results( } } - // successfully uploaded files, remove from in-mem hashset - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - for (path, _) in uploaded_files.iter() { - guard.remove(path); - } - - // Use monotonic time to ensure the 5-minute eviction window is immune to system clock adjustments. - let now = Instant::now(); - guard.retain(|path, entry| { - let elapsed = now.duration_since(entry.tracked_instant); - // 5 min eviction window - if elapsed >= Duration::from_secs(300) { - // Added log for observability and debugging - warn!( - "Removing stale sync file: {} (elapsed: {}s, tracked_at: {})", - path.display(), - elapsed.as_secs(), - entry.tracked_utc.to_rfc3339(), - ); - false - } else { - true - } - }); + // successfully uploaded files, remove from DashMap + for (path, _) in uploaded_files.iter() { + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); } + // Use monotonic time to ensure the 5-minute eviction window(cleanup) is immune to system clock adjustments. + let now = Instant::now(); + ACTIVE_OBJECT_STORE_SYNC_FILES.retain(|_, tracked_instant| { + now.duration_since(*tracked_instant) < Duration::from_secs(300) + }); let manifest_files: Vec<_> = uploaded_files .into_par_iter() .map(|(path, manifest_file)| { diff --git a/src/sync.rs b/src/sync.rs index e0ae0aa67..690ef7db8 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -16,16 +16,16 @@ * */ -use chrono::{DateTime, TimeDelta, Timelike, Utc}; +use chrono::{TimeDelta, Timelike}; +use dashmap::DashMap; use futures::FutureExt; use once_cell::sync::Lazy; use std::collections::HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; use std::path::PathBuf; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use tokio::sync::{RwLock, mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; @@ -34,17 +34,8 @@ use tracing::{Instrument, error, info, info_span, trace, warn}; static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); -// tracks metadata for files being synced to object storage -#[derive(Clone, Debug)] -pub struct SyncFileEntry { - // Monotonic instant when file was added to tracking - pub tracked_instant: std::time::Instant, - // Wall-clock UTC timestamp for observability - pub tracked_utc: DateTime, -} - -pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); +pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy> = + Lazy::new(DashMap::new); /// RAII guard that clears a sync-running flag on drop, so a panic inside the /// sync body cannot leave the flag stuck at `true` and wedge future ticks. struct SyncRunningGuard(&'static AtomicBool);