From 50c92aad3b26ba4c814ef2e825f4e8a6ff3bc4f2 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Sun, 29 Mar 2026 10:04:15 +0200 Subject: [PATCH] [SPARK-52428] Add SparkSession::add_artifact() for uploading files to the cluster --- crates/connect/src/client/mod.rs | 23 +++++++++++++++ crates/connect/src/session.rs | 50 ++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/crates/connect/src/client/mod.rs b/crates/connect/src/client/mod.rs index 7677d14..6db32ea 100644 --- a/crates/connect/src/client/mod.rs +++ b/crates/connect/src/client/mod.rs @@ -36,6 +36,8 @@ use arrow_ipc::reader::StreamReader; use uuid::Uuid; +use futures_util::stream; + use crate::errors::SparkError; mod builder; @@ -358,6 +360,27 @@ where Ok(resp) } + pub async fn add_artifacts( + &self, + artifacts: Vec, + ) -> Result { + let batch = spark::add_artifacts_request::Batch { artifacts }; + + let req = spark::AddArtifactsRequest { + session_id: self.session_id(), + user_context: self.user_context.clone(), + client_type: self.builder.user_agent.clone(), + payload: Some(spark::add_artifacts_request::Payload::Batch(batch)), + }; + + let mut client = self.stub.write().await; + + let stream = stream::once(async { req }); + let resp = client.add_artifacts(stream).await?.into_inner(); + + Ok(resp) + } + pub async fn interrupt_request( &self, interrupt_type: spark::interrupt_request::InterruptType, diff --git a/crates/connect/src/session.rs b/crates/connect/src/session.rs index 94184a7..a658953 100644 --- a/crates/connect/src/session.rs +++ b/crates/connect/src/session.rs @@ -230,6 +230,56 @@ impl SparkSession { self.client } + /// Upload a local file (JAR, Python file, etc.) to the Spark cluster. + /// + /// The file will be available in the session for use by Spark jobs. + /// + /// # Arguments + /// * `local_path` - Path to the local file to upload + /// * `remote_name` - Name/path for the artifact on the server (e.g., "jars/my-lib.jar") + pub async fn add_artifact( + &self, + local_path: &str, + remote_name: &str, + ) -> Result { + let data = std::fs::read(local_path)?; + + let chunk = spark::add_artifacts_request::ArtifactChunk { data, crc: 0 }; + + let artifact = spark::add_artifacts_request::SingleChunkArtifact { + name: remote_name.to_string(), + data: Some(chunk), + }; + + self.client.add_artifacts(vec![artifact]).await + } + + /// Upload multiple local files to the Spark cluster in a single batch. + /// + /// Each entry is a tuple of `(local_path, remote_name)`. + /// + /// # Arguments + /// * `artifacts` - A slice of `(local_path, remote_name)` tuples + pub async fn add_artifacts( + &self, + artifacts: &[(&str, &str)], + ) -> Result { + let mut single_chunks = Vec::with_capacity(artifacts.len()); + + for (local_path, remote_name) in artifacts { + let data = std::fs::read(local_path)?; + + let chunk = spark::add_artifacts_request::ArtifactChunk { data, crc: 0 }; + + single_chunks.push(spark::add_artifacts_request::SingleChunkArtifact { + name: remote_name.to_string(), + data: Some(chunk), + }); + } + + self.client.add_artifacts(single_chunks).await + } + /// Interrupt all operations of this session currently running on the connected server. pub async fn interrupt_all(&self) -> Result, SparkError> { let resp = self