diff --git a/crates/connect/src/session.rs b/crates/connect/src/session.rs index 94184a7..0023c6a 100644 --- a/crates/connect/src/session.rs +++ b/crates/connect/src/session.rs @@ -214,6 +214,100 @@ impl SparkSession { Ok(DataFrame::new(self.session(), logical_plan)) } + /// Returns a [DataFrame] representing the result of the given query with named parameters. + /// + /// Named parameters are referenced in the SQL query using `:name` syntax. + /// + /// # Example + /// ```rust + /// use std::collections::HashMap; + /// use spark_connect_rs::spark::expression::Literal; + /// + /// let mut args = HashMap::new(); + /// args.insert("threshold".to_string(), Literal::from(100i32)); + /// + /// let df = spark.sql_with_args("SELECT * FROM t WHERE value > :threshold", args).await?; + /// ``` + pub async fn sql_with_args( + &self, + sql_query: &str, + args: HashMap, + ) -> Result { + let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand { + sql: sql_query.to_string(), + args, + pos_args: vec![], + }); + + let plan = LogicalPlanBuilder::plan_cmd(sql_cmd); + + let resp = self + .clone() + .client() + .execute_command_and_fetch(plan) + .await?; + + let relation = resp + .sql_command_result + .to_owned() + .ok_or(SparkError::AnalysisException( + "SQL command result is empty".to_string(), + ))? + .relation; + + let logical_plan = LogicalPlanBuilder::new(relation.ok_or( + SparkError::AnalysisException("SQL relation result is empty".to_string()), + )?); + + Ok(DataFrame::new(self.session(), logical_plan)) + } + + /// Returns a [DataFrame] representing the result of the given query with positional parameters. + /// + /// Positional parameters are referenced in the SQL query using `?` syntax. + /// + /// # Example + /// ```rust + /// use spark_connect_rs::spark::expression::Literal; + /// + /// let params = vec![Literal::from(100i32), Literal::from("active".to_string())]; + /// + /// let df = spark.sql_with_pos_args("SELECT * FROM t WHERE value > ? AND status = ?", params).await?; + /// ``` + pub async fn sql_with_pos_args( + &self, + sql_query: &str, + pos_args: Vec, + ) -> Result { + let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand { + sql: sql_query.to_string(), + args: HashMap::default(), + pos_args, + }); + + let plan = LogicalPlanBuilder::plan_cmd(sql_cmd); + + let resp = self + .clone() + .client() + .execute_command_and_fetch(plan) + .await?; + + let relation = resp + .sql_command_result + .to_owned() + .ok_or(SparkError::AnalysisException( + "SQL command result is empty".to_string(), + ))? + .relation; + + let logical_plan = LogicalPlanBuilder::new(relation.ok_or( + SparkError::AnalysisException("SQL relation result is empty".to_string()), + )?); + + Ok(DataFrame::new(self.session(), logical_plan)) + } + pub fn create_dataframe(&self, data: &RecordBatch) -> Result { let logical_plan = LogicalPlanBuilder::local_relation(data)?;