diff --git a/crates/connect/src/session.rs b/crates/connect/src/session.rs index 94184a7..f259846 100644 --- a/crates/connect/src/session.rs +++ b/crates/connect/src/session.rs @@ -220,6 +220,31 @@ impl SparkSession { Ok(DataFrame::new(self.session(), logical_plan)) } + /// Create a [DataFrame] from a Polars DataFrame. + /// + /// Converts the Polars DataFrame to an Arrow RecordBatch by converting each + /// column individually (polars-arrow to arrow-rs), then creates a Spark DataFrame. + #[cfg(feature = "polars")] + pub fn create_dataframe_from_polars( + &self, + df: &polars::frame::DataFrame, + ) -> Result { + let mut columns: Vec<(String, arrow::array::ArrayRef)> = Vec::new(); + + for series in df.get_columns() { + let name = series.name().to_string(); + // Rechunk ensures all data is in a single chunk, so to_arrow(0, ...) gets everything + let rechunked = series.rechunk(); + let polars_array = rechunked.to_arrow(0, polars::datatypes::CompatLevel::oldest()); + // Convert polars-arrow Box to arrow-rs ArrayRef via the arrow_rs feature + let arrow_array: arrow::array::ArrayRef = polars_array.into(); + columns.push((name, arrow_array)); + } + + let record_batch = RecordBatch::try_from_iter(columns)?; + self.create_dataframe(&record_batch) + } + /// Return the session ID pub fn session_id(&self) -> &str { &self.session_id