From 64412957ff9b741b3b57bce19124f204caf5e117 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sat, 28 Mar 2026 19:31:42 +0100 Subject: [PATCH] [SPARK-53313] Add JDBC Reader and Writer support --- crates/connect/src/readwriter.rs | 55 ++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/crates/connect/src/readwriter.rs b/crates/connect/src/readwriter.rs index 5d1a78e..1851191 100644 --- a/crates/connect/src/readwriter.rs +++ b/crates/connect/src/readwriter.rs @@ -382,6 +382,47 @@ define_file_options! { } } +define_file_options! { + /// Options for configuring JDBC data source reads and writes. + /// + /// # Options + /// + /// - `url`: JDBC connection URL (e.g., `jdbc:postgresql://host:port/database`). + /// - `dbtable`: The JDBC table to read from or write to. Can also be a subquery in parentheses. + /// - `driver`: JDBC driver class name (e.g., `org.postgresql.Driver`). + /// - `user`: Database user name for authentication. + /// - `password`: Database password for authentication. + /// - `query`: A SQL query to use as the data source (alternative to `dbtable`). + /// - `partition_column`: Column used for partitioning reads. + /// - `lower_bound`: Lower bound of the partition column for parallel reads. + /// - `upper_bound`: Upper bound of the partition column for parallel reads. + /// - `num_partitions`: Number of partitions for parallel reads/writes. + /// - `fetch_size`: Number of rows to fetch per round trip. + /// - `batch_size`: Number of rows to insert per batch during writes. + /// - `isolation_level`: Transaction isolation level (e.g., `READ_COMMITTED`). + /// - `truncate`: Whether to truncate the table before writing (instead of dropping and recreating). + /// - `create_table_options`: Options appended to the CREATE TABLE statement. + /// - `create_table_column_types`: Column data types to use when creating the table. + pub struct JdbcOptions { + url : String, camel_case = "url" + dbtable : String, camel_case = "dbtable" + driver : String, camel_case = "driver" + user : String, camel_case = "user" + password : String, camel_case = "password" + query : String, camel_case = "query" + partition_column : String, camel_case = "partitionColumn" + lower_bound : String, camel_case = "lowerBound" + upper_bound : String, camel_case = "upperBound" + num_partitions : i32, camel_case = "numPartitions" + fetch_size : i32, camel_case = "fetchSize" + batch_size : i32, camel_case = "batchSize" + isolation_level : String, camel_case = "isolationLevel" + truncate : bool, camel_case = "truncate" + create_table_options : String, camel_case = "createTableOptions" + create_table_column_types : String, camel_case = "createTableColumnTypes" + } +} + /// DataFrameReader represents the entrypoint to create a DataFrame /// from a specific file format. #[derive(Clone, Debug)] @@ -558,6 +599,13 @@ impl DataFrameReader { self.read_options.extend(config.to_options()); self.load(paths) } + + /// Reads data from a JDBC data source with the specified options. + pub fn jdbc(mut self, config: C) -> Result { + self.format = Some("jdbc".to_string()); + self.read_options.extend(config.to_options()); + self.load(Vec::<&str>::new()) + } } /// DataFrameWriter provides the ability to output a [DataFrame] @@ -764,6 +812,13 @@ impl DataFrameWriter { self.write_options.extend(config.to_options()); self.save(path).await } + + /// Writes data to a JDBC data source with the specified options. + pub async fn jdbc(mut self, config: C) -> Result<(), SparkError> { + self.format = Some("jdbc".to_string()); + self.write_options.extend(config.to_options()); + self.save("").await + } } pub struct DataFrameWriterV2 {