Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions crates/connect/src/streaming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::HashMap;

use crate::plan::LogicalPlanBuilder;
use crate::readwriter::ConfigOpts;
use crate::session::SparkSession;
use crate::spark;
pub use crate::spark::write_stream_operation_start::Trigger;
Expand Down Expand Up @@ -111,6 +112,71 @@ impl DataStreamReader {
plan,
})
}

/// Loads a CSV stream from the given path with the specified options.
pub fn csv<C: ConfigOpts>(mut self, path: &str, config: C) -> Result<DataFrame, SparkError> {
self.format = Some("csv".to_string());
self.read_options.extend(config.to_options());
self.load(Some(path))
}

/// Loads a JSON stream from the given path with the specified options.
pub fn json<C: ConfigOpts>(mut self, path: &str, config: C) -> Result<DataFrame, SparkError> {
self.format = Some("json".to_string());
self.read_options.extend(config.to_options());
self.load(Some(path))
}

/// Loads an ORC stream from the given path with the specified options.
pub fn orc<C: ConfigOpts>(mut self, path: &str, config: C) -> Result<DataFrame, SparkError> {
self.format = Some("orc".to_string());
self.read_options.extend(config.to_options());
self.load(Some(path))
}

/// Loads a Parquet stream from the given path with the specified options.
pub fn parquet<C: ConfigOpts>(
mut self,
path: &str,
config: C,
) -> Result<DataFrame, SparkError> {
self.format = Some("parquet".to_string());
self.read_options.extend(config.to_options());
self.load(Some(path))
}

/// Loads a text stream from the given path with the specified options.
pub fn text<C: ConfigOpts>(mut self, path: &str, config: C) -> Result<DataFrame, SparkError> {
self.format = Some("text".to_string());
self.read_options.extend(config.to_options());
self.load(Some(path))
}

/// Loads a stream from a table.
pub fn table(self, table_name: &str) -> Result<DataFrame, SparkError> {
let read_type = Some(spark::relation::RelType::Read(spark::Read {
is_streaming: true,
read_type: Some(spark::read::ReadType::NamedTable(spark::read::NamedTable {
unparsed_identifier: table_name.to_string(),
options: self.read_options,
})),
}));

let relation = spark::Relation {
common: Some(spark::RelationCommon {
source_info: "NA".to_string(),
plan_id: Some(1),
}),
rel_type: read_type,
};

let plan = LogicalPlanBuilder::new(relation);

Ok(DataFrame {
spark_session: self.spark_session,
plan,
})
}
}

/// Streaming Output Modes
Expand Down Expand Up @@ -277,6 +343,31 @@ impl DataStreamWriter {

self.start_stream(sink).await
}

/// Starts a CSV streaming query writing to the given path.
pub async fn csv(self, path: &str) -> Result<StreamingQuery, SparkError> {
self.format("csv").start(Some(path)).await
}

/// Starts a JSON streaming query writing to the given path.
pub async fn json(self, path: &str) -> Result<StreamingQuery, SparkError> {
self.format("json").start(Some(path)).await
}

/// Starts an ORC streaming query writing to the given path.
pub async fn orc(self, path: &str) -> Result<StreamingQuery, SparkError> {
self.format("orc").start(Some(path)).await
}

/// Starts a Parquet streaming query writing to the given path.
pub async fn parquet(self, path: &str) -> Result<StreamingQuery, SparkError> {
self.format("parquet").start(Some(path)).await
}

/// Starts a text streaming query writing to the given path.
pub async fn text(self, path: &str) -> Result<StreamingQuery, SparkError> {
self.format("text").start(Some(path)).await
}
}

/// Represents the active streaming created from a `start` on the writer
Expand Down