diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b67cf18f9..b2d491ef5 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,10 +16,32 @@ * */ +use crate::catalog::{self, snapshot::Snapshot}; +use crate::event::format::LogSource; +use crate::event::format::LogSourceEntry; +use crate::handlers::DatasetTag; +use crate::handlers::http::fetch_schema; +use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; +use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; +use crate::metrics::increment_parquets_stored_by_date; +use crate::metrics::increment_parquets_stored_size_by_date; +use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; +use crate::option::Mode; +use crate::parseable::DEFAULT_TENANT; +use crate::parseable::{LogStream, PARSEABLE, Stream}; +use crate::stats::FullStats; +use crate::storage::SETTINGS_ROOT_DIRECTORY; +use crate::storage::TARGETS_ROOT_DIRECTORY; +use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; +use crate::storage::field_stats::calculate_field_stats; +use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; +use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use dashmap::mapref::entry::Entry; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use itertools::Itertools; use object_store::ListResult; @@ -43,28 +65,6 @@ use tokio::task::JoinSet; use tracing::{Instrument, error, info, info_span, warn}; use ulid::Ulid; -use crate::catalog::{self, snapshot::Snapshot}; -use crate::event::format::LogSource; -use crate::event::format::LogSourceEntry; -use crate::handlers::DatasetTag; -use crate::handlers::http::fetch_schema; -use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; -use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR}; -use crate::metrics::increment_parquets_stored_by_date; -use crate::metrics::increment_parquets_stored_size_by_date; -use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; -use crate::option::Mode; -use crate::parseable::DEFAULT_TENANT; -use crate::parseable::{LogStream, PARSEABLE, Stream}; -use crate::stats::FullStats; -use crate::storage::SETTINGS_ROOT_DIRECTORY; -use crate::storage::TARGETS_ROOT_DIRECTORY; -use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; -use crate::storage::field_stats::calculate_field_stats; -use crate::storage::field_stats::extract_datetime_from_parquet_path_regex; -use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES; - use super::{ ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, @@ -1071,21 +1071,20 @@ async fn process_parquet_files( let object_store = PARSEABLE.storage().get_object_store(); // collect all parquet files to upload - let parquet_paths = { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - - let parquet_paths: Vec = upload_context - .stream - .parquet_files() - .into_par_iter() - .filter(|p| !guard.contains(p)) - .collect(); - - let mut ret = Vec::with_capacity(parquet_paths.len()); - ret.clone_from(&parquet_paths); - guard.extend(parquet_paths); - ret - }; + let parquet_paths: Vec = upload_context + .stream + .parquet_files() + .into_par_iter() + .filter_map( + |path| match ACTIVE_OBJECT_STORE_SYNC_FILES.entry(path.clone()) { + Entry::Vacant(entry) => { + entry.insert(Instant::now()); + Some(path) + } + Entry::Occupied(_) => None, + }, + ) + .collect(); let mut total_size: u64 = 0; let mut min_dt: Option> = None; @@ -1184,18 +1183,12 @@ async fn collect_upload_results( "Parquet file upload size validation failed for {:?}, preserving in staging for retry", upload_result.file_path ); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&upload_result.file_path); - } + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&upload_result.file_path); } } Ok(Err((path, e))) => { error!("Error uploading parquet file: {e}"); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&path); - } + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&path); return Err(e); } Err(e) => { @@ -1205,20 +1198,15 @@ async fn collect_upload_results( } } - // successfully uploaded files, remove from in-mem hashset - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - for (path, _) in uploaded_files.iter() { - guard.remove(path); - } - - // check if file has been in hashset for more than 5 minutes - let now = Utc::now(); - guard.retain(|f| { - !extract_datetime_from_parquet_path_regex(f) - .is_ok_and(|ts| (now - ts).num_minutes() >= 5) - }); + // successfully uploaded files, remove from DashMap + for (path, _) in uploaded_files.iter() { + ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path); } + // Use monotonic time to ensure the 5-minute eviction window(cleanup) is immune to system clock adjustments. + let now = Instant::now(); + ACTIVE_OBJECT_STORE_SYNC_FILES.retain(|_, tracked_instant| { + now.duration_since(*tracked_instant) < Duration::from_secs(300) + }); let manifest_files: Vec<_> = uploaded_files .into_par_iter() .map(|(path, manifest_file)| { diff --git a/src/sync.rs b/src/sync.rs index 14a88ab38..690ef7db8 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,16 +17,15 @@ */ use chrono::{TimeDelta, Timelike}; -use datafusion::common::HashSet; +use dashmap::DashMap; use futures::FutureExt; use once_cell::sync::Lazy; use std::collections::HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; use std::path::PathBuf; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use tokio::sync::{RwLock, mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; @@ -35,8 +34,8 @@ use tracing::{Instrument, error, info, info_span, trace, warn}; static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false); -pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(HashSet::new()))); +pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy> = + Lazy::new(DashMap::new); /// RAII guard that clears a sync-running flag on drop, so a panic inside the /// sync body cannot leave the flag stuck at `true` and wedge future ticks. struct SyncRunningGuard(&'static AtomicBool);