Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions crates/connect/src/readwriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,47 @@ define_file_options! {
}
}

define_file_options! {
/// Options for configuring JDBC data source reads and writes.
///
/// # Options
///
/// - `url`: JDBC connection URL (e.g., `jdbc:postgresql://host:port/database`).
/// - `dbtable`: The JDBC table to read from or write to. Can also be a subquery in parentheses.
/// - `driver`: JDBC driver class name (e.g., `org.postgresql.Driver`).
/// - `user`: Database user name for authentication.
/// - `password`: Database password for authentication.
/// - `query`: A SQL query to use as the data source (alternative to `dbtable`).
/// - `partition_column`: Column used for partitioning reads.
/// - `lower_bound`: Lower bound of the partition column for parallel reads.
/// - `upper_bound`: Upper bound of the partition column for parallel reads.
/// - `num_partitions`: Number of partitions for parallel reads/writes.
/// - `fetch_size`: Number of rows to fetch per round trip.
/// - `batch_size`: Number of rows to insert per batch during writes.
/// - `isolation_level`: Transaction isolation level (e.g., `READ_COMMITTED`).
/// - `truncate`: Whether to truncate the table before writing (instead of dropping and recreating).
/// - `create_table_options`: Options appended to the CREATE TABLE statement.
/// - `create_table_column_types`: Column data types to use when creating the table.
pub struct JdbcOptions {
url : String, camel_case = "url"
dbtable : String, camel_case = "dbtable"
driver : String, camel_case = "driver"
user : String, camel_case = "user"
password : String, camel_case = "password"
query : String, camel_case = "query"
partition_column : String, camel_case = "partitionColumn"
lower_bound : String, camel_case = "lowerBound"
upper_bound : String, camel_case = "upperBound"
num_partitions : i32, camel_case = "numPartitions"
fetch_size : i32, camel_case = "fetchSize"
batch_size : i32, camel_case = "batchSize"
isolation_level : String, camel_case = "isolationLevel"
truncate : bool, camel_case = "truncate"
create_table_options : String, camel_case = "createTableOptions"
create_table_column_types : String, camel_case = "createTableColumnTypes"
}
}

/// DataFrameReader represents the entrypoint to create a DataFrame
/// from a specific file format.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -558,6 +599,13 @@ impl DataFrameReader {
self.read_options.extend(config.to_options());
self.load(paths)
}

/// Reads data from a JDBC data source with the specified options.
pub fn jdbc<C: ConfigOpts>(mut self, config: C) -> Result<DataFrame, SparkError> {
self.format = Some("jdbc".to_string());
self.read_options.extend(config.to_options());
self.load(Vec::<&str>::new())
}
}

/// DataFrameWriter provides the ability to output a [DataFrame]
Expand Down Expand Up @@ -764,6 +812,13 @@ impl DataFrameWriter {
self.write_options.extend(config.to_options());
self.save(path).await
}

/// Writes data to a JDBC data source with the specified options.
pub async fn jdbc<C: ConfigOpts>(mut self, config: C) -> Result<(), SparkError> {
self.format = Some("jdbc".to_string());
self.write_options.extend(config.to_options());
self.save("").await
}
}

pub struct DataFrameWriterV2 {
Expand Down