Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions crates/connect/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,100 @@ impl SparkSession {
Ok(DataFrame::new(self.session(), logical_plan))
}

/// Returns a [DataFrame] representing the result of the given query with named parameters.
///
/// Named parameters are referenced in the SQL query using `:name` syntax.
///
/// # Example
/// ```rust
/// use std::collections::HashMap;
/// use spark_connect_rs::spark::expression::Literal;
///
/// let mut args = HashMap::new();
/// args.insert("threshold".to_string(), Literal::from(100i32));
///
/// let df = spark.sql_with_args("SELECT * FROM t WHERE value > :threshold", args).await?;
/// ```
pub async fn sql_with_args(
&self,
sql_query: &str,
args: HashMap<String, spark::expression::Literal>,
) -> Result<DataFrame, SparkError> {
let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand {
sql: sql_query.to_string(),
args,
pos_args: vec![],
});

let plan = LogicalPlanBuilder::plan_cmd(sql_cmd);

let resp = self
.clone()
.client()
.execute_command_and_fetch(plan)
.await?;

let relation = resp
.sql_command_result
.to_owned()
.ok_or(SparkError::AnalysisException(
"SQL command result is empty".to_string(),
))?
.relation;

let logical_plan = LogicalPlanBuilder::new(relation.ok_or(
SparkError::AnalysisException("SQL relation result is empty".to_string()),
)?);

Ok(DataFrame::new(self.session(), logical_plan))
}

/// Returns a [DataFrame] representing the result of the given query with positional parameters.
///
/// Positional parameters are referenced in the SQL query using `?` syntax.
///
/// # Example
/// ```rust
/// use spark_connect_rs::spark::expression::Literal;
///
/// let params = vec![Literal::from(100i32), Literal::from("active".to_string())];
///
/// let df = spark.sql_with_pos_args("SELECT * FROM t WHERE value > ? AND status = ?", params).await?;
/// ```
pub async fn sql_with_pos_args(
&self,
sql_query: &str,
pos_args: Vec<spark::expression::Literal>,
) -> Result<DataFrame, SparkError> {
let sql_cmd = spark::command::CommandType::SqlCommand(spark::SqlCommand {
sql: sql_query.to_string(),
args: HashMap::default(),
pos_args,
});

let plan = LogicalPlanBuilder::plan_cmd(sql_cmd);

let resp = self
.clone()
.client()
.execute_command_and_fetch(plan)
.await?;

let relation = resp
.sql_command_result
.to_owned()
.ok_or(SparkError::AnalysisException(
"SQL command result is empty".to_string(),
))?
.relation;

let logical_plan = LogicalPlanBuilder::new(relation.ok_or(
SparkError::AnalysisException("SQL relation result is empty".to_string()),
)?);

Ok(DataFrame::new(self.session(), logical_plan))
}

pub fn create_dataframe(&self, data: &RecordBatch) -> Result<DataFrame, SparkError> {
let logical_plan = LogicalPlanBuilder::local_relation(data)?;

Expand Down