From 66919651eea3fd1de6cc59d3cd71c4772e8cdc97 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 28 Mar 2026 19:31:11 +0100 Subject: [PATCH] [SPARK-53314] Add DataStreamReader and DataStreamWriter convenience methods --- crates/connect/src/streaming/mod.rs | 91 +++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/crates/connect/src/streaming/mod.rs b/crates/connect/src/streaming/mod.rs index 5feb4ce..4137ff4 100644 --- a/crates/connect/src/streaming/mod.rs +++ b/crates/connect/src/streaming/mod.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use crate::plan::LogicalPlanBuilder; +use crate::readwriter::ConfigOpts; use crate::session::SparkSession; use crate::spark; pub use crate::spark::write_stream_operation_start::Trigger; @@ -111,6 +112,71 @@ impl DataStreamReader { plan, }) } + + /// Loads a CSV stream from the given path with the specified options. + pub fn csv(mut self, path: &str, config: C) -> Result { + self.format = Some("csv".to_string()); + self.read_options.extend(config.to_options()); + self.load(Some(path)) + } + + /// Loads a JSON stream from the given path with the specified options. + pub fn json(mut self, path: &str, config: C) -> Result { + self.format = Some("json".to_string()); + self.read_options.extend(config.to_options()); + self.load(Some(path)) + } + + /// Loads an ORC stream from the given path with the specified options. + pub fn orc(mut self, path: &str, config: C) -> Result { + self.format = Some("orc".to_string()); + self.read_options.extend(config.to_options()); + self.load(Some(path)) + } + + /// Loads a Parquet stream from the given path with the specified options. + pub fn parquet( + mut self, + path: &str, + config: C, + ) -> Result { + self.format = Some("parquet".to_string()); + self.read_options.extend(config.to_options()); + self.load(Some(path)) + } + + /// Loads a text stream from the given path with the specified options. + pub fn text(mut self, path: &str, config: C) -> Result { + self.format = Some("text".to_string()); + self.read_options.extend(config.to_options()); + self.load(Some(path)) + } + + /// Loads a stream from a table. + pub fn table(self, table_name: &str) -> Result { + let read_type = Some(spark::relation::RelType::Read(spark::Read { + is_streaming: true, + read_type: Some(spark::read::ReadType::NamedTable(spark::read::NamedTable { + unparsed_identifier: table_name.to_string(), + options: self.read_options, + })), + })); + + let relation = spark::Relation { + common: Some(spark::RelationCommon { + source_info: "NA".to_string(), + plan_id: Some(1), + }), + rel_type: read_type, + }; + + let plan = LogicalPlanBuilder::new(relation); + + Ok(DataFrame { + spark_session: self.spark_session, + plan, + }) + } } /// Streaming Output Modes @@ -277,6 +343,31 @@ impl DataStreamWriter { self.start_stream(sink).await } + + /// Starts a CSV streaming query writing to the given path. + pub async fn csv(self, path: &str) -> Result { + self.format("csv").start(Some(path)).await + } + + /// Starts a JSON streaming query writing to the given path. + pub async fn json(self, path: &str) -> Result { + self.format("json").start(Some(path)).await + } + + /// Starts an ORC streaming query writing to the given path. + pub async fn orc(self, path: &str) -> Result { + self.format("orc").start(Some(path)).await + } + + /// Starts a Parquet streaming query writing to the given path. + pub async fn parquet(self, path: &str) -> Result { + self.format("parquet").start(Some(path)).await + } + + /// Starts a text streaming query writing to the given path. + pub async fn text(self, path: &str) -> Result { + self.format("text").start(Some(path)).await + } } /// Represents the active streaming created from a `start` on the writer