Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/connect/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ impl Catalog {

let data: &arrow::array::BooleanArray = match col.data_type() {
arrow::datatypes::DataType::Boolean => col.as_boolean(),
_ => unimplemented!("only Boolean data types are currently handled currently."),
_ => {
return Err(SparkError::NotYetImplemented(
"only Boolean data types are currently handled".to_string(),
))
}
};

Ok(data.value(0))
Expand Down
17 changes: 15 additions & 2 deletions crates/connect/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ impl RunTimeConfig {

let resp = self.client.config_request(operation).await?;

let val = resp.pairs.first().unwrap().value().to_string();
let val = resp
.pairs
.first()
.ok_or_else(|| {
SparkError::AnalysisException("config key not found in response".to_string())
})?
.value()
.to_string();

Ok(val)
}
Expand All @@ -136,7 +143,13 @@ impl RunTimeConfig {

let resp = self.client.config_request(operation).await?;

let val = resp.pairs.first().unwrap().value();
let val = resp
.pairs
.first()
.ok_or_else(|| {
SparkError::AnalysisException("config key not found in response".to_string())
})?
.value();

match val {
"true" => Ok(true),
Expand Down
4 changes: 2 additions & 2 deletions crates/connect/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ gen_func!(spark_partition_id, [], "A column for partition ID.");
#[allow(unused_variables)]
/// Evaluates a list of conditions and returns one of multiple possible result expressions.
fn when(condition: impl Into<Column>, value: Column) -> Column {
unimplemented!("not implemented")
todo!("when() is not yet implemented")
}

/// Computes bitwise not.
Expand Down Expand Up @@ -735,7 +735,7 @@ gen_func!(array_size, [col: Column], "Returns the total number of elements in th

#[allow(unused_variables)]
pub fn array_sort(col: impl Into<Column>, compactor: Option<impl Into<Column>>) -> Column {
unimplemented!()
todo!("array_sort() is not yet implemented")
}

/// adds an item into a given array at a specified array index.
Expand Down
8 changes: 5 additions & 3 deletions crates/connect/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ impl LogicalPlanBuilder {
let mut min_non_nulls = match how {
"all" => Some(1),
"any" => None,
&_ => panic!("'how' arg needs to be 'all' or 'any'"),
&_ => {
panic!("'how' argument must be 'all' or 'any', got '{}'", how)
}
};

if let Some(threshold) = threshold {
Expand Down Expand Up @@ -850,8 +852,8 @@ where
VecExpression::from_iter(cols)
.expr
.into_iter()
.map(|col| match col.expr_type.clone().unwrap() {
spark::expression::ExprType::SortOrder(ord) => *ord,
.map(|col| match col.expr_type.clone() {
Some(spark::expression::ExprType::SortOrder(ord)) => *ord,
_ => spark::expression::SortOrder {
child: Some(Box::new(col)),
direction: 1,
Expand Down
2 changes: 1 addition & 1 deletion crates/connect/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl From<spark::StorageLevel> for StorageLevel {
(true, true, false, false, 2) => StorageLevel::MemoryAndDisk2,
(true, true, true, false, 1) => StorageLevel::OffHeap,
(true, true, false, true, 1) => StorageLevel::MemoryAndDiskDeser,
_ => unimplemented!(),
_ => StorageLevel::None,
}
}
}
Expand Down
98 changes: 91 additions & 7 deletions crates/connect/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Rust Types to Spark Types
#![allow(dead_code)]

use crate::errors::SparkError;
use crate::spark;

/// Represents basic methods for a [SparkDataType]
Expand Down Expand Up @@ -286,12 +287,30 @@ pub enum DataType {
}

impl DataType {
pub fn from_str_name(value: &str) -> DataType {
pub fn from_str_name(value: &str) -> Result<DataType, SparkError> {
match value.to_lowercase().as_str() {
"bool" | "boolean" => DataType::Boolean,
"int" | "integer" => DataType::Integer,
"str" | "string" => DataType::String,
_ => unimplemented!("not implemented"),
"null" | "void" => Ok(DataType::Null),
"binary" => Ok(DataType::Binary),
"bool" | "boolean" => Ok(DataType::Boolean),
"byte" | "tinyint" => Ok(DataType::Byte),
"short" | "smallint" => Ok(DataType::Short),
"int" | "integer" => Ok(DataType::Integer),
"long" | "bigint" => Ok(DataType::Long),
"float" => Ok(DataType::Float),
"double" => Ok(DataType::Double),
"decimal" => Ok(DataType::Decimal {
scale: None,
precision: None,
}),
"str" | "string" => Ok(DataType::String),
"date" => Ok(DataType::Date),
"timestamp" => Ok(DataType::Timestamp),
"timestamp_ntz" => Ok(DataType::TimestampNtz),
"interval" | "calendar_interval" => Ok(DataType::CalendarInterval),
_ => Err(SparkError::NotYetImplemented(format!(
"DataType '{}' is not supported",
value
))),
}
}

Expand Down Expand Up @@ -501,7 +520,7 @@ impl SparkDataType for DataType {
(0, 1) => String::from("interval month to year"),
(1, 1) => String::from("interval month"),
(1, 0) => String::from("interval year to month"),
(_, _) => unimplemented!("Invalid YearMonthInterval"),
(_, _) => String::from("interval year"),
}
}
Self::DayTimeInterval {
Expand All @@ -527,7 +546,7 @@ impl SparkDataType for DataType {
(3, 1) => String::from("interval second to hour"),
(3, 2) => String::from("interval second to minute"),
(3, 3) => String::from("interval second"),
(_, _) => unimplemented!("Invalid DayTimeInterval"),
(_, _) => String::from("interval day"),
}
}
Self::Array {
Expand Down Expand Up @@ -714,4 +733,69 @@ mod tests {

assert_eq!(expected, schema.json());
}

#[test]
fn test_from_str_name() {
assert!(matches!(
DataType::from_str_name("null"),
Ok(DataType::Null)
));
assert!(matches!(
DataType::from_str_name("void"),
Ok(DataType::Null)
));
assert!(matches!(
DataType::from_str_name("boolean"),
Ok(DataType::Boolean)
));
assert!(matches!(
DataType::from_str_name("byte"),
Ok(DataType::Byte)
));
assert!(matches!(
DataType::from_str_name("short"),
Ok(DataType::Short)
));
assert!(matches!(
DataType::from_str_name("integer"),
Ok(DataType::Integer)
));
assert!(matches!(
DataType::from_str_name("long"),
Ok(DataType::Long)
));
assert!(matches!(
DataType::from_str_name("float"),
Ok(DataType::Float)
));
assert!(matches!(
DataType::from_str_name("double"),
Ok(DataType::Double)
));
assert!(matches!(
DataType::from_str_name("string"),
Ok(DataType::String)
));
assert!(matches!(
DataType::from_str_name("date"),
Ok(DataType::Date)
));
assert!(matches!(
DataType::from_str_name("timestamp"),
Ok(DataType::Timestamp)
));
assert!(matches!(
DataType::from_str_name("timestamp_ntz"),
Ok(DataType::TimestampNtz)
));
assert!(matches!(
DataType::from_str_name("binary"),
Ok(DataType::Binary)
));
assert!(matches!(
DataType::from_str_name("decimal"),
Ok(DataType::Decimal { .. })
));
assert!(DataType::from_str_name("unsupported_type").is_err());
}
}