From 4a9351d9384b61fdc12377a447443857a05e3253 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sun, 29 Mar 2026 09:19:29 +0200 Subject: [PATCH] [SPARK-52428] Harden miscellaneous module error handling --- crates/connect/src/catalog.rs | 6 +- crates/connect/src/conf.rs | 17 ++++- crates/connect/src/functions/mod.rs | 4 +- crates/connect/src/plan.rs | 8 ++- crates/connect/src/storage.rs | 2 +- crates/connect/src/types.rs | 98 ++++++++++++++++++++++++++--- 6 files changed, 119 insertions(+), 16 deletions(-) diff --git a/crates/connect/src/catalog.rs b/crates/connect/src/catalog.rs index 22ae1fe..13a423b 100644 --- a/crates/connect/src/catalog.rs +++ b/crates/connect/src/catalog.rs @@ -44,7 +44,11 @@ impl Catalog { let data: &arrow::array::BooleanArray = match col.data_type() { arrow::datatypes::DataType::Boolean => col.as_boolean(), - _ => unimplemented!("only Boolean data types are currently handled currently."), + _ => { + return Err(SparkError::NotYetImplemented( + "only Boolean data types are currently handled".to_string(), + )) + } }; Ok(data.value(0)) diff --git a/crates/connect/src/conf.rs b/crates/connect/src/conf.rs index 7baa1bd..092c36f 100644 --- a/crates/connect/src/conf.rs +++ b/crates/connect/src/conf.rs @@ -118,7 +118,14 @@ impl RunTimeConfig { let resp = self.client.config_request(operation).await?; - let val = resp.pairs.first().unwrap().value().to_string(); + let val = resp + .pairs + .first() + .ok_or_else(|| { + SparkError::AnalysisException("config key not found in response".to_string()) + })? + .value() + .to_string(); Ok(val) } @@ -136,7 +143,13 @@ impl RunTimeConfig { let resp = self.client.config_request(operation).await?; - let val = resp.pairs.first().unwrap().value(); + let val = resp + .pairs + .first() + .ok_or_else(|| { + SparkError::AnalysisException("config key not found in response".to_string()) + })? + .value(); match val { "true" => Ok(true), diff --git a/crates/connect/src/functions/mod.rs b/crates/connect/src/functions/mod.rs index b518b4d..4077152 100644 --- a/crates/connect/src/functions/mod.rs +++ b/crates/connect/src/functions/mod.rs @@ -147,7 +147,7 @@ gen_func!(spark_partition_id, [], "A column for partition ID."); #[allow(unused_variables)] /// Evaluates a list of conditions and returns one of multiple possible result expressions. fn when(condition: impl Into, value: Column) -> Column { - unimplemented!("not implemented") + todo!("when() is not yet implemented") } /// Computes bitwise not. @@ -735,7 +735,7 @@ gen_func!(array_size, [col: Column], "Returns the total number of elements in th #[allow(unused_variables)] pub fn array_sort(col: impl Into, compactor: Option>) -> Column { - unimplemented!() + todo!("array_sort() is not yet implemented") } /// adds an item into a given array at a specified array index. diff --git a/crates/connect/src/plan.rs b/crates/connect/src/plan.rs index 30fabb5..9b960a4 100644 --- a/crates/connect/src/plan.rs +++ b/crates/connect/src/plan.rs @@ -289,7 +289,9 @@ impl LogicalPlanBuilder { let mut min_non_nulls = match how { "all" => Some(1), "any" => None, - &_ => panic!("'how' arg needs to be 'all' or 'any'"), + &_ => { + panic!("'how' argument must be 'all' or 'any', got '{}'", how) + } }; if let Some(threshold) = threshold { @@ -850,8 +852,8 @@ where VecExpression::from_iter(cols) .expr .into_iter() - .map(|col| match col.expr_type.clone().unwrap() { - spark::expression::ExprType::SortOrder(ord) => *ord, + .map(|col| match col.expr_type.clone() { + Some(spark::expression::ExprType::SortOrder(ord)) => *ord, _ => spark::expression::SortOrder { child: Some(Box::new(col)), direction: 1, diff --git a/crates/connect/src/storage.rs b/crates/connect/src/storage.rs index 0ea7638..c1828dc 100644 --- a/crates/connect/src/storage.rs +++ b/crates/connect/src/storage.rs @@ -52,7 +52,7 @@ impl From for StorageLevel { (true, true, false, false, 2) => StorageLevel::MemoryAndDisk2, (true, true, true, false, 1) => StorageLevel::OffHeap, (true, true, false, true, 1) => StorageLevel::MemoryAndDiskDeser, - _ => unimplemented!(), + _ => StorageLevel::None, } } } diff --git a/crates/connect/src/types.rs b/crates/connect/src/types.rs index e6357b3..9108245 100644 --- a/crates/connect/src/types.rs +++ b/crates/connect/src/types.rs @@ -18,6 +18,7 @@ //! Rust Types to Spark Types #![allow(dead_code)] +use crate::errors::SparkError; use crate::spark; /// Represents basic methods for a [SparkDataType] @@ -286,12 +287,30 @@ pub enum DataType { } impl DataType { - pub fn from_str_name(value: &str) -> DataType { + pub fn from_str_name(value: &str) -> Result { match value.to_lowercase().as_str() { - "bool" | "boolean" => DataType::Boolean, - "int" | "integer" => DataType::Integer, - "str" | "string" => DataType::String, - _ => unimplemented!("not implemented"), + "null" | "void" => Ok(DataType::Null), + "binary" => Ok(DataType::Binary), + "bool" | "boolean" => Ok(DataType::Boolean), + "byte" | "tinyint" => Ok(DataType::Byte), + "short" | "smallint" => Ok(DataType::Short), + "int" | "integer" => Ok(DataType::Integer), + "long" | "bigint" => Ok(DataType::Long), + "float" => Ok(DataType::Float), + "double" => Ok(DataType::Double), + "decimal" => Ok(DataType::Decimal { + scale: None, + precision: None, + }), + "str" | "string" => Ok(DataType::String), + "date" => Ok(DataType::Date), + "timestamp" => Ok(DataType::Timestamp), + "timestamp_ntz" => Ok(DataType::TimestampNtz), + "interval" | "calendar_interval" => Ok(DataType::CalendarInterval), + _ => Err(SparkError::NotYetImplemented(format!( + "DataType '{}' is not supported", + value + ))), } } @@ -501,7 +520,7 @@ impl SparkDataType for DataType { (0, 1) => String::from("interval month to year"), (1, 1) => String::from("interval month"), (1, 0) => String::from("interval year to month"), - (_, _) => unimplemented!("Invalid YearMonthInterval"), + (_, _) => String::from("interval year"), } } Self::DayTimeInterval { @@ -527,7 +546,7 @@ impl SparkDataType for DataType { (3, 1) => String::from("interval second to hour"), (3, 2) => String::from("interval second to minute"), (3, 3) => String::from("interval second"), - (_, _) => unimplemented!("Invalid DayTimeInterval"), + (_, _) => String::from("interval day"), } } Self::Array { @@ -714,4 +733,69 @@ mod tests { assert_eq!(expected, schema.json()); } + + #[test] + fn test_from_str_name() { + assert!(matches!( + DataType::from_str_name("null"), + Ok(DataType::Null) + )); + assert!(matches!( + DataType::from_str_name("void"), + Ok(DataType::Null) + )); + assert!(matches!( + DataType::from_str_name("boolean"), + Ok(DataType::Boolean) + )); + assert!(matches!( + DataType::from_str_name("byte"), + Ok(DataType::Byte) + )); + assert!(matches!( + DataType::from_str_name("short"), + Ok(DataType::Short) + )); + assert!(matches!( + DataType::from_str_name("integer"), + Ok(DataType::Integer) + )); + assert!(matches!( + DataType::from_str_name("long"), + Ok(DataType::Long) + )); + assert!(matches!( + DataType::from_str_name("float"), + Ok(DataType::Float) + )); + assert!(matches!( + DataType::from_str_name("double"), + Ok(DataType::Double) + )); + assert!(matches!( + DataType::from_str_name("string"), + Ok(DataType::String) + )); + assert!(matches!( + DataType::from_str_name("date"), + Ok(DataType::Date) + )); + assert!(matches!( + DataType::from_str_name("timestamp"), + Ok(DataType::Timestamp) + )); + assert!(matches!( + DataType::from_str_name("timestamp_ntz"), + Ok(DataType::TimestampNtz) + )); + assert!(matches!( + DataType::from_str_name("binary"), + Ok(DataType::Binary) + )); + assert!(matches!( + DataType::from_str_name("decimal"), + Ok(DataType::Decimal { .. }) + )); + assert!(DataType::from_str_name("unsupported_type").is_err()); + } }