Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 46 additions & 58 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,32 @@
*
*/

use crate::catalog::{self, snapshot::Snapshot};
use crate::event::format::LogSource;
use crate::event::format::LogSourceEntry;
use crate::handlers::DatasetTag;
use crate::handlers::http::fetch_schema;
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::increment_parquets_stored_by_date;
use crate::metrics::increment_parquets_stored_size_by_date;
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE};
use crate::option::Mode;
use crate::parseable::DEFAULT_TENANT;
use crate::parseable::{LogStream, PARSEABLE, Stream};
use crate::stats::FullStats;
use crate::storage::SETTINGS_ROOT_DIRECTORY;
use crate::storage::TARGETS_ROOT_DIRECTORY;
use crate::storage::field_stats::DATASET_STATS_STREAM_NAME;
use crate::storage::field_stats::calculate_field_stats;
use crate::storage::field_stats::extract_datetime_from_parquet_path_regex;
use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES;
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use dashmap::mapref::entry::Entry;
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use itertools::Itertools;
use object_store::ListResult;
Expand All @@ -43,28 +65,6 @@ use tokio::task::JoinSet;
use tracing::{Instrument, error, info, info_span, warn};
use ulid::Ulid;

use crate::catalog::{self, snapshot::Snapshot};
use crate::event::format::LogSource;
use crate::event::format::LogSourceEntry;
use crate::handlers::DatasetTag;
use crate::handlers::http::fetch_schema;
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::increment_parquets_stored_by_date;
use crate::metrics::increment_parquets_stored_size_by_date;
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE};
use crate::option::Mode;
use crate::parseable::DEFAULT_TENANT;
use crate::parseable::{LogStream, PARSEABLE, Stream};
use crate::stats::FullStats;
use crate::storage::SETTINGS_ROOT_DIRECTORY;
use crate::storage::TARGETS_ROOT_DIRECTORY;
use crate::storage::field_stats::DATASET_STATS_STREAM_NAME;
use crate::storage::field_stats::calculate_field_stats;
use crate::storage::field_stats::extract_datetime_from_parquet_path_regex;
use crate::sync::ACTIVE_OBJECT_STORE_SYNC_FILES;

use super::{
ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat,
PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
Expand Down Expand Up @@ -1071,21 +1071,20 @@ async fn process_parquet_files(
let object_store = PARSEABLE.storage().get_object_store();

// collect all parquet files to upload
let parquet_paths = {
let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;

let parquet_paths: Vec<PathBuf> = upload_context
.stream
.parquet_files()
.into_par_iter()
.filter(|p| !guard.contains(p))
.collect();

let mut ret = Vec::with_capacity(parquet_paths.len());
ret.clone_from(&parquet_paths);
guard.extend(parquet_paths);
ret
};
let parquet_paths: Vec<PathBuf> = upload_context
.stream
.parquet_files()
.into_par_iter()
.filter_map(
|path| match ACTIVE_OBJECT_STORE_SYNC_FILES.entry(path.clone()) {
Entry::Vacant(entry) => {
entry.insert(Instant::now());
Some(path)
}
Entry::Occupied(_) => None,
},
)
.collect();

let mut total_size: u64 = 0;
let mut min_dt: Option<chrono::DateTime<Utc>> = None;
Expand Down Expand Up @@ -1184,18 +1183,12 @@ async fn collect_upload_results(
"Parquet file upload size validation failed for {:?}, preserving in staging for retry",
upload_result.file_path
);
{
let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
guard.remove(&upload_result.file_path);
}
ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&upload_result.file_path);
}
}
Ok(Err((path, e))) => {
error!("Error uploading parquet file: {e}");
{
let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
guard.remove(&path);
}
ACTIVE_OBJECT_STORE_SYNC_FILES.remove(&path);
return Err(e);
}
Err(e) => {
Expand All @@ -1205,20 +1198,15 @@ async fn collect_upload_results(
}
}

// successfully uploaded files, remove from in-mem hashset
{
let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
for (path, _) in uploaded_files.iter() {
guard.remove(path);
}

// check if file has been in hashset for more than 5 minutes
let now = Utc::now();
guard.retain(|f| {
!extract_datetime_from_parquet_path_regex(f)
.is_ok_and(|ts| (now - ts).num_minutes() >= 5)
});
// successfully uploaded files, remove from DashMap
for (path, _) in uploaded_files.iter() {
ACTIVE_OBJECT_STORE_SYNC_FILES.remove(path);
}
// Use monotonic time to ensure the 5-minute eviction window(cleanup) is immune to system clock adjustments.
let now = Instant::now();
ACTIVE_OBJECT_STORE_SYNC_FILES.retain(|_, tracked_instant| {
now.duration_since(*tracked_instant) < Duration::from_secs(300)
});
let manifest_files: Vec<_> = uploaded_files
.into_par_iter()
.map(|(path, manifest_file)| {
Expand Down
9 changes: 4 additions & 5 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
*/

use chrono::{TimeDelta, Timelike};
use datafusion::common::HashSet;
use dashmap::DashMap;
use futures::FutureExt;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{RwLock, mpsc, oneshot};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinSet;
use tokio::time::{Duration, Instant, interval_at, sleep};
use tokio::{select, task};
Expand All @@ -35,8 +34,8 @@ use tracing::{Instrument, error, info, info_span, trace, warn};
static LOCAL_SYNC_RUNNING: AtomicBool = AtomicBool::new(false);
static REMOTE_SYNC_RUNNING: AtomicBool = AtomicBool::new(false);

pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy<Arc<RwLock<HashSet<PathBuf>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashSet::new())));
pub static ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy<DashMap<PathBuf, std::time::Instant>> =
Lazy::new(DashMap::new);
/// RAII guard that clears a sync-running flag on drop, so a panic inside the
/// sync body cannot leave the flag stuck at `true` and wedge future ticks.
struct SyncRunningGuard(&'static AtomicBool);
Expand Down
Loading