From d4aae82579c17b0519cad7def962f19b2841cab0 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sun, 29 Mar 2026 09:31:51 +0200 Subject: [PATCH] [SPARK-52428] Add configurable gRPC connection and request timeouts --- crates/connect/src/client/builder.rs | 5 +++++ crates/connect/src/client/config.rs | 2 ++ crates/connect/src/session.rs | 27 ++++++++++++++++++++++++--- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/crates/connect/src/client/builder.rs b/crates/connect/src/client/builder.rs index e265384..5006035 100644 --- a/crates/connect/src/client/builder.rs +++ b/crates/connect/src/client/builder.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::env; use std::str::FromStr; +use std::time::Duration; use crate::errors::SparkError; @@ -43,6 +44,8 @@ pub struct ChannelBuilder { pub(super) user_agent: Option, pub(super) use_ssl: bool, pub(super) headers: Option>, + pub connect_timeout: Option, + pub request_timeout: Option, } impl Default for ChannelBuilder { @@ -172,6 +175,8 @@ impl ChannelBuilder { user_agent: ChannelBuilder::create_user_agent(None), use_ssl: false, headers: None, + connect_timeout: None, + request_timeout: None, }; if let Some(mut headers) = headers { diff --git a/crates/connect/src/client/config.rs b/crates/connect/src/client/config.rs index 6cd29ff..7f1cb76 100644 --- a/crates/connect/src/client/config.rs +++ b/crates/connect/src/client/config.rs @@ -115,6 +115,8 @@ impl From for ChannelBuilder { } else { Some(headers) }, + connect_timeout: None, + request_timeout: None, } } } diff --git a/crates/connect/src/session.rs b/crates/connect/src/session.rs index 94184a7..4b670d5 100644 --- a/crates/connect/src/session.rs +++ b/crates/connect/src/session.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use crate::client::{ChannelBuilder, Config, HeadersLayer, SparkClient, SparkConnectClient}; @@ -99,10 +100,30 @@ impl SparkSessionBuilder { self } + /// Sets the timeout for establishing the initial gRPC connection. + pub fn connect_timeout(mut self, timeout: Duration) -> Self { + self.channel_builder.connect_timeout = Some(timeout); + self + } + + /// Sets the timeout for individual gRPC requests. + pub fn request_timeout(mut self, timeout: Duration) -> Self { + self.channel_builder.request_timeout = Some(timeout); + self + } + async fn create_client(&self) -> Result { - let channel = Channel::from_shared(self.channel_builder.endpoint())? - .connect() - .await?; + let mut endpoint = Channel::from_shared(self.channel_builder.endpoint())?; + + if let Some(timeout) = self.channel_builder.connect_timeout { + endpoint = endpoint.connect_timeout(timeout); + } + + if let Some(timeout) = self.channel_builder.request_timeout { + endpoint = endpoint.timeout(timeout); + } + + let channel = endpoint.connect().await?; let channel = ServiceBuilder::new() .layer(HeadersLayer::new(