diff --git a/src/event/mod.rs b/src/event/mod.rs index f9955e986..a4855332c 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -28,7 +28,6 @@ use tracing::{info_span, instrument}; use self::error::EventError; use crate::{ - LOCK_EXPECT, handlers::TelemetryType, metadata::update_stats, metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date}, @@ -157,7 +156,7 @@ pub fn commit_schema( tenant_id: &Option, ) -> Result<(), StagingError> { let _span = info_span!("commit_schema", stream_name).entered(); - let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); + let mut stream_metadata = PARSEABLE.streams.write(); let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let map = &mut stream_metadata .get_mut(tenant_id) @@ -166,7 +165,6 @@ pub fn commit_schema( .ok_or_else(|| StreamNotFound(stream_name.to_string()))? .metadata .write() - .expect(LOCK_EXPECT) .schema; let current_schema = Schema::new(map.values().cloned().collect::()); let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 9fc2ab299..d057527e7 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -35,7 +35,7 @@ use crate::utils::get_tenant_id_from_request; use crate::utils::json::flatten::{ self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, }; -use crate::{LOCK_EXPECT, stats, validator}; +use crate::{stats, validator}; use actix_web::http::StatusCode; use actix_web::web::{Json, Path}; @@ -397,15 +397,14 @@ pub async fn get_stream_info( }; let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let hash_map = PARSEABLE.streams.read().unwrap(); + let hash_map = PARSEABLE.streams.read(); let stream_meta = hash_map .get(tenant_id) .ok_or_else(|| TenantNotFound(tenant_id.to_owned()))? .get(&stream_name) .ok_or_else(|| StreamNotFound(stream_name.clone()))? .metadata - .read() - .expect(LOCK_EXPECT); + .read(); let stream_info = StreamInfo::from_metadata(&stream_meta, stream_first_event_at, stream_latest_event_at); diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 6e6bb370a..1e5072a8e 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -636,7 +636,7 @@ pub async fn initialize_hot_tier_metadata_on_startup( ) -> anyhow::Result<()> { // Collect hot tier configurations from streams before doing async operations let hot_tier_configs: Vec<(String, Option, StreamHotTier)> = { - let tenants_guard = PARSEABLE.streams.read().unwrap(); + let tenants_guard = PARSEABLE.streams.read(); tenants_guard .iter() .flat_map(|(tenant_id, streams)| { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index b2eef36ea..12fb50750 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -280,7 +280,6 @@ impl Parseable { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); self.streams .read() - .unwrap() .get(tenant_id) .ok_or_else(|| StreamNotFound(format!("{stream_name} with tenant {tenant_id}"))) .map(|v| v.get(stream_name))? @@ -1219,9 +1218,7 @@ impl Parseable { write_user_groups().remove(tenant_id); // delete streams - if let Ok(mut streams) = PARSEABLE.streams.write() { - streams.remove(tenant_id); - } + PARSEABLE.streams.write().remove(tenant_id); // delete from in-mem if let Ok(mut tenants) = self.tenants.write() { diff --git a/src/parseable/staging/mod.rs b/src/parseable/staging/mod.rs index 5d1a3a071..85f6d8986 100644 --- a/src/parseable/staging/mod.rs +++ b/src/parseable/staging/mod.rs @@ -17,8 +17,6 @@ * */ -use std::sync::PoisonError; - use crate::{parseable::StreamNotFound, tenants::TenantNotFound}; pub mod reader; @@ -38,6 +36,4 @@ pub enum StagingError { StreamNotFound(#[from] StreamNotFound), #[error("{0}")] TenantNotFound(#[from] TenantNotFound), - #[error("{0}")] - PoisonError(#[from] PoisonError), } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 6c75dec75..8e1541da2 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -23,6 +23,7 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::derive::{Deref, DerefMut}; use itertools::Itertools; +use parking_lot::{Mutex, RwLock}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -33,13 +34,12 @@ use parquet::{ schema::types::ColumnPath, }; use relative_path::RelativePathBuf; -use std::sync::PoisonError; use std::{ collections::{HashMap, HashSet}, fs::{self, File, OpenOptions, remove_file, write}, num::NonZeroU32, path::{Path, PathBuf}, - sync::{Arc, Mutex, RwLock}, + sync::Arc, time::{Instant, SystemTime, UNIX_EPOCH}, }; use tokio::task::JoinSet; @@ -47,7 +47,7 @@ use tracing::{Instrument, error, info, info_span, instrument, trace, warn}; use ulid::Ulid; use crate::{ - LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, + OBJECT_STORE_DATA_GRANULARITY, cli::Options, event::{ DEFAULT_TIMESTAMP_KEY, @@ -155,20 +155,7 @@ impl Stream { let mut guard = { let _lock_span = info_span!("acquire_writer_lock").entered(); - match self.writer.lock() { - Ok(guard) => guard, - Err(poisoned) => { - error!( - "Writer lock poisoned while ingesting data for stream {}", - self.stream_name - ); - - return Err(StagingError::PoisonError(PoisonError::new(format!( - "Writer lock poisoned while ingesting data for stream {} - {}", - self.stream_name, poisoned - )))); - } - } + self.writer.lock() }; if self.options.mode != Mode::Query || stream_type == StreamType::Internal { let filename = @@ -533,11 +520,11 @@ impl Stream { } pub fn recordbatches_cloned(&self, schema: &Arc) -> Vec { - self.writer.lock().unwrap().mem.recordbatch_cloned(schema) + self.writer.lock().mem.recordbatch_cloned(schema) } pub fn clear(&self) { - self.writer.lock().unwrap().mem.clear(); + self.writer.lock().mem.clear(); } pub fn flush(&self, forced: bool) { @@ -546,12 +533,7 @@ impl Stream { // DiskWriter::Drop does I/O (IPC finish + file rename) so dropping // outside the lock avoids blocking concurrent push() calls. let stale_writers = { - let mut writer = self.writer.lock().unwrap_or_else(|_| { - panic!( - "Writer lock poisoned while flushing data for stream {}", - self.stream_name - ) - }); + let mut writer = self.writer.lock(); writer.mem.clear(); let mut old_disk = HashMap::new(); @@ -975,54 +957,39 @@ impl Stream { /// Stores the provided stream metadata in memory mapping pub async fn set_metadata(&self, updated_metadata: LogStreamMetadata) { - *self.metadata.write().expect(LOCK_EXPECT) = updated_metadata; + *self.metadata.write() = updated_metadata; } pub fn get_first_event(&self) -> Option { - self.metadata - .read() - .expect(LOCK_EXPECT) - .first_event_at - .clone() + self.metadata.read().first_event_at.clone() } pub fn get_time_partition(&self) -> Option { - self.metadata - .read() - .expect(LOCK_EXPECT) - .time_partition - .clone() + self.metadata.read().time_partition.clone() } pub fn get_time_partition_limit(&self) -> Option { - self.metadata - .read() - .expect(LOCK_EXPECT) - .time_partition_limit + self.metadata.read().time_partition_limit } pub fn get_custom_partition(&self) -> Option { - self.metadata - .read() - .expect(LOCK_EXPECT) - .custom_partition - .clone() + self.metadata.read().custom_partition.clone() } pub fn get_static_schema_flag(&self) -> bool { - self.metadata.read().expect(LOCK_EXPECT).static_schema_flag + self.metadata.read().static_schema_flag } pub fn get_retention(&self) -> Option { - self.metadata.read().expect(LOCK_EXPECT).retention.clone() + self.metadata.read().retention.clone() } pub fn get_schema_version(&self) -> SchemaVersion { - self.metadata.read().expect(LOCK_EXPECT).schema_version + self.metadata.read().schema_version } pub fn get_schema(&self) -> Arc { - let metadata = self.metadata.read().expect(LOCK_EXPECT); + let metadata = self.metadata.read(); // sort fields on read from hashmap as order of fields can differ. // This provides a stable output order if schema is same between calls to this function @@ -1037,15 +1004,15 @@ impl Stream { } pub fn get_schema_raw(&self) -> HashMap> { - self.metadata.read().expect(LOCK_EXPECT).schema.clone() + self.metadata.read().schema.clone() } pub fn set_retention(&self, retention: Retention) { - self.metadata.write().expect(LOCK_EXPECT).retention = Some(retention); + self.metadata.write().retention = Some(retention); } pub fn set_first_event_at(&self, first_event_at: &str) { - self.metadata.write().expect(LOCK_EXPECT).first_event_at = Some(first_event_at.to_owned()); + self.metadata.write().first_event_at = Some(first_event_at.to_owned()); } /// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata. @@ -1073,80 +1040,65 @@ impl Stream { /// } /// ``` pub fn reset_first_event_at(&self) { - self.metadata - .write() - .expect(LOCK_EXPECT) - .first_event_at - .take(); + self.metadata.write().first_event_at.take(); } pub fn set_time_partition_limit(&self, time_partition_limit: NonZeroU32) { - self.metadata - .write() - .expect(LOCK_EXPECT) - .time_partition_limit = Some(time_partition_limit); + self.metadata.write().time_partition_limit = Some(time_partition_limit); } pub fn set_custom_partition(&self, custom_partition: Option<&String>) { - self.metadata.write().expect(LOCK_EXPECT).custom_partition = custom_partition.cloned(); + self.metadata.write().custom_partition = custom_partition.cloned(); } pub fn get_infer_timestamp(&self) -> bool { - self.metadata.read().expect(LOCK_EXPECT).infer_timestamp + self.metadata.read().infer_timestamp } pub fn set_hot_tier(&self, hot_tier: Option) { - let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + let mut metadata = self.metadata.write(); metadata.hot_tier.clone_from(&hot_tier); metadata.hot_tier_enabled = hot_tier.is_some(); } pub fn get_hot_tier(&self) -> Option { - self.metadata.read().expect(LOCK_EXPECT).hot_tier.clone() + self.metadata.read().hot_tier.clone() } pub fn is_hot_tier_enabled(&self) -> bool { - self.metadata.read().expect(LOCK_EXPECT).hot_tier_enabled + self.metadata.read().hot_tier_enabled } pub fn get_stream_type(&self) -> StreamType { - self.metadata.read().expect(LOCK_EXPECT).stream_type + self.metadata.read().stream_type } pub fn set_log_source(&self, log_source: Vec) { - self.metadata.write().expect(LOCK_EXPECT).log_source = log_source; + self.metadata.write().log_source = log_source; } pub fn get_log_source(&self) -> Vec { - self.metadata.read().expect(LOCK_EXPECT).log_source.clone() + self.metadata.read().log_source.clone() } pub fn get_dataset_tags(&self) -> Vec { - self.metadata - .read() - .expect(LOCK_EXPECT) - .dataset_tags - .clone() + self.metadata.read().dataset_tags.clone() } pub fn get_dataset_labels(&self) -> Vec { - self.metadata - .read() - .expect(LOCK_EXPECT) - .dataset_labels - .clone() + self.metadata.read().dataset_labels.clone() } pub fn set_dataset_tags(&self, tags: Vec) { - self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags; + self.metadata.write().dataset_tags = tags; } pub fn set_dataset_labels(&self, labels: Vec) { - self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels; + self.metadata.write().dataset_labels = labels; } pub fn add_log_source(&self, log_source: LogSourceEntry) { - let metadata = self.metadata.read().expect(LOCK_EXPECT); + let metadata = self.metadata.read(); for existing in &metadata.log_source { if existing.log_source_format == log_source.log_source_format { drop(metadata); @@ -1159,7 +1111,7 @@ impl Stream { } drop(metadata); - let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + let mut metadata = self.metadata.write(); for existing in &metadata.log_source { if existing.log_source_format == log_source.log_source_format { self.add_fields_to_log_source( @@ -1173,7 +1125,7 @@ impl Stream { } pub fn add_fields_to_log_source(&self, log_source: &LogSource, fields: HashSet) { - let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + let mut metadata = self.metadata.write(); for log_source_entry in metadata.log_source.iter_mut() { if log_source_entry.log_source_format == *log_source { log_source_entry.fields.extend(fields); @@ -1183,7 +1135,7 @@ impl Stream { } pub fn get_fields_from_log_source(&self, log_source: &LogSource) -> Option> { - let metadata = self.metadata.read().expect(LOCK_EXPECT); + let metadata = self.metadata.read(); for log_source_entry in metadata.log_source.iter() { if log_source_entry.log_source_format == *log_source { return Some(log_source_entry.fields.clone()); @@ -1349,7 +1301,7 @@ impl Streams { ingestor_id: Option, tenant_id: &Option, ) -> StreamRef { - let mut guard = self.write().expect(LOCK_EXPECT); + let mut guard = self.write(); let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); @@ -1370,16 +1322,16 @@ impl Streams { /// TODO: validate possibility of stream continuing to exist despite being deleted pub fn delete(&self, stream_name: &str, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let mut guard = self.write().expect(LOCK_EXPECT); + let mut guard = self.write(); if let Some(tenant_streams) = guard.get_mut(tenant_id) { tenant_streams.remove(stream_name); } - // self.write().expect(LOCK_EXPECT).remove(stream_name); + // self.write().remove(stream_name); } pub fn contains(&self, stream_name: &str, tenant_id: &Option) -> bool { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(tenant) = self.read().expect(LOCK_EXPECT).get(tenant_id) { + if let Some(tenant) = self.read().get(tenant_id) { tenant.contains_key(stream_name) } else { false @@ -1388,11 +1340,7 @@ impl Streams { /// Returns the number of logstreams that parseable is aware of pub fn len(&self) -> usize { - self.read() - .expect(LOCK_EXPECT) - .iter() - .map(|map| map.1.len()) - .sum() + self.read().iter().map(|map| map.1.len()).sum() } /// Returns true if parseable is not aware of any streams @@ -1404,7 +1352,7 @@ impl Streams { pub fn list(&self, tenant_id: &Option) -> Vec { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let guard = self.read().expect(LOCK_EXPECT); + let guard = self.read(); if let Some(tenant_streams) = guard.get(tenant_id) { tenant_streams.keys().map(String::clone).collect() } else { @@ -1422,14 +1370,12 @@ impl Streams { } pub fn list_internal_streams(&self, tenant_id: &Option) -> Vec { - let map = self.read().expect(LOCK_EXPECT); + let map = self.read(); let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); if let Some(tenant_streams) = map.get(tenant_id) { tenant_streams .iter() - .filter(|(_, stream)| { - stream.metadata.read().expect(LOCK_EXPECT).stream_type == StreamType::Internal - }) + .filter(|(_, stream)| stream.metadata.read().stream_type == StreamType::Internal) .map(|(k, _)| k.clone()) .collect() } else { @@ -1452,7 +1398,7 @@ impl Streams { }; for tenant_id in tenants { - let guard = self.read().expect(LOCK_EXPECT); + let guard = self.read(); let streams: Vec> = if let Some(tenant_streams) = guard.get(&tenant_id) { tenant_streams.values().map(Arc::clone).collect() } else { @@ -1975,7 +1921,7 @@ mod tests { assert!(Arc::ptr_eq(&stream1, &stream2)); // Verify the map contains only one entry - let guard = streams.read().expect("Failed to acquire read lock"); + let guard = streams.read(); assert_eq!(guard.len(), 1); } @@ -1988,7 +1934,7 @@ mod tests { let ingestor_id = Some("new_ingestor".to_owned()); // Assert the stream doesn't exist already - let mut guard = streams.write().expect("Failed to acquire read lock"); + let mut guard = streams.write(); assert_eq!(guard.len(), 0); assert!( !guard @@ -2011,7 +1957,7 @@ mod tests { assert_eq!(stream.ingestor_id, ingestor_id); // Assert that the stream is created - let guard = streams.read().expect("Failed to acquire read lock"); + let guard = streams.read(); assert_eq!(guard.len(), 1); assert!(guard.get(DEFAULT_TENANT).unwrap().contains_key(stream_name)); } @@ -2058,7 +2004,7 @@ mod tests { assert!(Arc::ptr_eq(&stream1, &stream2)); // Verify the map contains only one entry - let guard = streams.read().expect("Failed to acquire read lock"); + let guard = streams.read(); assert_eq!(guard.len(), 1); } } diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index ec7e62fab..202f1f8b5 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -177,7 +177,7 @@ pub async fn get_stream_info_helper( }; let stream = PARSEABLE.get_stream(stream_name, tenant_id)?; - let stream_meta = stream.metadata.read().unwrap(); + let stream_meta = stream.metadata.read(); let stream_info = StreamInfo::from_metadata(&stream_meta, stream_first_event_at, stream_latest_event_at); diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 652e78ed4..0e32545ab 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -132,7 +132,6 @@ pub fn init(metadata: &StorageMetadata) { PARSEABLE .streams .write() - .unwrap() .entry(DEFAULT_TENANT.to_owned()) .or_default();