From 27324009fac00af306b10ffd5df38118cdb0208d Mon Sep 17 00:00:00 2001 From: kination Date: Sun, 22 Feb 2026 09:30:12 +0900 Subject: [PATCH 1/2] Apply vortex API on read/write for arrow --- vine-core/src/storage_reader.rs | 131 +------- vine-core/src/streaming_writer.rs | 124 ------- vine-core/src/streaming_writer_v2.rs | 23 +- vine-core/src/vine_batch_writer.rs | 16 +- vine-core/src/vine_streaming_writer.rs | 9 +- vine-core/src/vortex_exp.rs | 428 +------------------------ 6 files changed, 47 insertions(+), 684 deletions(-) delete mode 100644 vine-core/src/streaming_writer.rs diff --git a/vine-core/src/storage_reader.rs b/vine-core/src/storage_reader.rs index 7d16fee..5e3ba36 100644 --- a/vine-core/src/storage_reader.rs +++ b/vine-core/src/storage_reader.rs @@ -1,30 +1,22 @@ -/// Reads data from Vortex (.vtx) files in date-partitioned directories. -/// Caching is handled internally. - use std::fs; use std::path::PathBuf; use chrono::NaiveDate; +use vortex::ArrayRef as VortexArrayRef; + use crate::global_cache; use crate::metadata::Metadata; -use crate::vortex_exp::{read_vortex_file, array_to_csv_rows}; +use crate::vortex_exp::read_vortex_file; -// TODO: Used by direct conversion (currently disabled) -#[allow(unused_imports)] -use vortex::{ArrayRef as VortexArrayRef}; - -/// Read all data from Vine storage -/// -/// This is the main entry point for reading Vine data. -/// Caching is handled internally. +/// Read vine data as Vortex arrays /// /// # Arguments /// * `dir_path` - Base directory containing date-partitioned Vortex files /// /// # Returns -/// Vector of CSV-formatted row strings -pub fn read_vine_data(dir_path: &str) -> Vec { +/// Vector of Vortex ArrayRef +pub fn read_vine_data(dir_path: &str) -> Vec { read_vine_data_internal(dir_path) .unwrap_or_else(|e| { eprintln!("Error reading Vine data: {}", e); @@ -33,97 +25,16 @@ pub fn read_vine_data(dir_path: &str) -> Vec { } /// Internal implementation with automatic caching -fn read_vine_data_internal(dir_path: &str) -> Result, Box> { - // Use global cache to get metadata +fn read_vine_data_internal(dir_path: &str) -> Result, Box> { let metadata = global_cache::get_reader_metadata(dir_path)?; read_with_metadata(dir_path, &metadata) } -/// Read data using provided metadata -fn read_with_metadata(dir_path: &str, metadata: &Metadata) -> Result, Box> { - let base_path = PathBuf::from(dir_path); - - let mut all_rows = Vec::new(); - let mut directories = Vec::new(); - - // Scan for date-partitioned directories - let dir_entries = fs::read_dir(&base_path)?; - - for entry_result in dir_entries { - let entry = entry_result?; - let path = entry.path(); - - if path.is_dir() { - if let Some(dir_name) = path.file_name().and_then(|s| s.to_str()) { - if let Ok(date) = NaiveDate::parse_from_str(dir_name, "%Y-%m-%d") { - directories.push((date, path)); - } - } - } - } - - // Sort directories by date (chronological order) - directories.sort_by_key(|(date, _)| *date); - - // Read all Vortex files from date directories - for (_, dir_path) in directories { - let sub_dir = fs::read_dir(&dir_path)?; - - for file_entry_result in sub_dir { - let file_path = file_entry_result?.path(); - - // Process .vtx files only - if file_path.extension().map_or(false, |ext| ext == "vtx") { - if let Err(e) = read_vortex_file_to_rows(&file_path, metadata, &mut all_rows) { - eprintln!("Warning: Failed to read file {:?}: {}", file_path, e); - } - } - } - } - - Ok(all_rows) -} - -/// Read single Vortex file and append rows to row_list -fn read_vortex_file_to_rows( - file_path: &PathBuf, - metadata: &Metadata, - row_list: &mut Vec, -) -> Result<(), Box> { - let (_, array) = read_vortex_file(file_path) - .map_err(|e| -> Box { e })?; - let rows = array_to_csv_rows(&array, metadata) - .map_err(|e| -> Box { e })?; - row_list.extend(rows); - Ok(()) -} - -// ============================================================================ -// Direct Vortex Array Reading (No CSV conversion) -// ============================================================================ -// -// TODO: Part of direct Arrow ↔ Vortex conversion (currently disabled) -// See arrow_bridge.rs for status and implementation plan -// - -#[cfg(feature = "direct-vortex-conversion")] -/// Read all data from Vine storage as a combined Vortex array -/// -/// **TODO: Currently unused - part of direct conversion implementation** -/// -/// This function reads all date-partitioned Vortex files and combines them -/// into a single StructArray. No CSV conversion is performed. -/// -/// # Arguments -/// * `dir_path` - Base directory containing date-partitioned Vortex files -/// -/// # Returns -/// Combined VortexArrayRef containing all data -pub fn read_vine_vortex_array(dir_path: &str) -> Result> { +/// Read data with provided metadata +fn read_with_metadata(dir_path: &str, _metadata: &Metadata) -> Result, Box> { let base_path = PathBuf::from(dir_path); - let metadata = global_cache::get_reader_metadata(dir_path)?; - let mut all_arrays = Vec::new(); + let mut all_arrays: Vec = Vec::new(); let mut directories = Vec::new(); // Scan for date-partitioned directories @@ -155,24 +66,16 @@ pub fn read_vine_vortex_array(dir_path: &str) -> Result all_arrays.push(array), - Err(e) => eprintln!("Warning: Failed to read file {:?}: {}", file_path, e), + Ok((_dtype, array)) => { + all_arrays.push(array); + } + Err(e) => { + eprintln!("Warning: Failed to read file {:?}: {}", file_path, e); + } } } } } - if all_arrays.is_empty() { - return Err("No Vortex files found".into()); - } - - // If only one array, return it directly - if all_arrays.len() == 1 { - return Ok(all_arrays.into_iter().next().unwrap()); - } - - // Combine multiple arrays using Vortex concat - // For now, return the first array (full concat implementation would require Vortex concat API) - // TODO: Implement proper array concatenation - Ok(all_arrays.into_iter().next().unwrap()) + Ok(all_arrays) } diff --git a/vine-core/src/streaming_writer.rs b/vine-core/src/streaming_writer.rs deleted file mode 100644 index 4d259b7..0000000 --- a/vine-core/src/streaming_writer.rs +++ /dev/null @@ -1,124 +0,0 @@ -/// High-performance streaming writer optimized for continuous data ingestion. -/// Uses Vortex format (.vtx) for storage. -use std::fs; -use std::path::PathBuf; - -use chrono::Local; - -use crate::global_cache; -use crate::metadata::Metadata; -use crate::vortex_exp::write_vortex_file; -use crate::writer_config::WriterConfig; - -/// High-performance streaming writer for Vine format -/// -/// Optimized for: -/// - Continuous data streams -/// - Batched writes -/// - Automatic file rotation -/// -/// Caching is handled internally. -#[deprecated] -pub struct StreamingWriter { - base_path: PathBuf, - metadata: Metadata, - config: WriterConfig, - buffer: Vec, -} - -#[deprecated] -impl StreamingWriter { - /// Create new streaming writer - /// - /// Uses internal caching for metadata. - pub fn new(base_path: PathBuf) -> Result> { - let path_str = base_path.to_str().unwrap_or(""); - - // Use global cache to get metadata - let metadata = global_cache::get_writer_metadata(path_str)?; - - Ok(Self { - base_path, - metadata, - config: WriterConfig::default(), - buffer: Vec::new(), - }) - } - - /// Create with custom configuration - pub fn with_config(base_path: PathBuf, config: WriterConfig) -> Result> { - let path_str = base_path.to_str().unwrap_or(""); - let metadata = global_cache::get_writer_metadata(path_str)?; - - Ok(Self { - base_path, - metadata, - config, - buffer: Vec::new(), - }) - } - - /// Write batch of rows (CSV format) - pub fn write_batch(&mut self, rows: &[&str]) -> Result<(), Box> { - if rows.is_empty() { - return Ok(()); - } - - // Add rows to buffer - for row in rows { - self.buffer.push(row.to_string()); - } - - // Check if we should flush - if self.buffer.len() >= self.config.max_rows_per_file { - self.flush_buffer()?; - } - - Ok(()) - } - - /// Flush buffer to file - fn flush_buffer(&mut self) -> Result<(), Box> { - if self.buffer.is_empty() { - return Ok(()); - } - - // Create date partition directory - let now = Local::now(); - let date_str = now.format("%Y-%m-%d").to_string(); - let partition_dir = self.base_path.join(&date_str); - fs::create_dir_all(&partition_dir)?; - - // Generate filename with microsecond precision - let time_str = now.format("%H%M%S").to_string(); - let micros = now.timestamp_subsec_micros(); - let file_path = partition_dir.join(format!("data_{}_{}.vtx", time_str, micros)); - - // Convert buffer to &str slice for write_vortex_file - let rows: Vec<&str> = self.buffer.iter().map(|s| s.as_str()).collect(); - write_vortex_file(&file_path, &self.metadata, &rows) - .map_err(|e| -> Box { e })?; - - // Clear buffer - self.buffer.clear(); - - Ok(()) - } - - /// Flush and close writer - pub fn close(mut self) -> Result<(), Box> { - self.flush_buffer()?; - Ok(()) - } - - /// Flush current data (write to file) - pub fn flush(&mut self) -> Result<(), Box> { - self.flush_buffer() - } -} - -impl Drop for StreamingWriter { - fn drop(&mut self) { - let _ = self.flush_buffer(); - } -} diff --git a/vine-core/src/streaming_writer_v2.rs b/vine-core/src/streaming_writer_v2.rs index a960133..750615f 100644 --- a/vine-core/src/streaming_writer_v2.rs +++ b/vine-core/src/streaming_writer_v2.rs @@ -23,7 +23,7 @@ use vortex::IntoArray; use crate::global_cache; use crate::metadata::Metadata; -use crate::vortex_exp::{build_struct_array, create_session}; +use crate::vortex_exp::create_session; use crate::writer_config::WriterConfig; /// Summary of a flush operation @@ -97,29 +97,26 @@ impl StreamingWriterV2 { }) } - /// Write batch of rows (CSV format) + /// Write batch of Vortex array data /// - /// Convert rows(csv) to Vortex arrays, and accumulated in memory. - /// `flush()` to write accumulated chunks to disk. - pub fn write_batch(&mut self, rows: &[&str]) -> Result<(), Box> { - if rows.is_empty() { + /// Accepts vortex array directly and accumulates it in memory. + /// Call `flush()` to write accumulated chunks to disk. + pub fn write_batch(&mut self, array: &ArrayRef) -> Result<(), Box> { + let num_rows = array.len(); + if num_rows == 0 { return Ok(()); } // Check if adding current rows would exceed limit - if self.current_buffer_rows + rows.len() > self.config.max_rows_per_file + if self.current_buffer_rows + num_rows > self.config.max_rows_per_file && !self.chunk_buffer.is_empty() { // Flush before exceeding self.flush()?; } - // Convert 'csv' rows to 'Vortex array' - let array = build_struct_array(&self.metadata, rows) - .map_err(|e| -> Box { e })?; - - self.current_buffer_rows += rows.len(); - self.chunk_buffer.push(array); + self.current_buffer_rows += num_rows; + self.chunk_buffer.push(array.clone()); Ok(()) } diff --git a/vine-core/src/vine_batch_writer.rs b/vine-core/src/vine_batch_writer.rs index bde2628..1c18b7d 100644 --- a/vine-core/src/vine_batch_writer.rs +++ b/vine-core/src/vine_batch_writer.rs @@ -2,27 +2,23 @@ use std::fs; use std::path::{Path, PathBuf}; use chrono::Local; +use vortex::ArrayRef as VortexArrayRef; -use crate::global_cache; -use crate::vortex_exp::write_vortex_file; +use crate::vortex_exp::write_vortex_array; /// Batch writer for bulk data ingestion /// -/// Writes all data in a single operation. +/// Writes all data in a single operation using Vortex arrays directly. /// Caching is handled internally. pub struct VineBatchWriter; impl VineBatchWriter { - /// Write all data at once + /// Write Vortex array directly to storage pub fn write>( path: P, - data: &[&str], + array: &VortexArrayRef, ) -> Result<(), Box> { let base_path: PathBuf = PathBuf::from(path.as_ref()); - let path_str = base_path.to_str().unwrap_or(""); - - // Use global cache to get metadata - let metadata = global_cache::get_writer_metadata(path_str)?; // Create date partition directory let date_str = Local::now().format("%Y-%m-%d").to_string(); @@ -34,7 +30,7 @@ impl VineBatchWriter { let file_path = partition_dir.join(format!("data_{}.vtx", timestamp)); // Write Vortex file - write_vortex_file(&file_path, &metadata, data) + write_vortex_array(&file_path, array) .map_err(|e| -> Box { e })?; Ok(()) diff --git a/vine-core/src/vine_streaming_writer.rs b/vine-core/src/vine_streaming_writer.rs index 486cd5e..0ee1ee5 100644 --- a/vine-core/src/vine_streaming_writer.rs +++ b/vine-core/src/vine_streaming_writer.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; -// use crate::streaming_writer::StreamingWriter; +use vortex::ArrayRef as VortexArrayRef; + use crate::streaming_writer_v2::StreamingWriterV2 as StreamingWriter; use crate::writer_config::WriterConfig; @@ -27,9 +28,9 @@ impl VineStreamingWriter { Ok(Self { inner: writer }) } - /// Append batch of rows to the stream - pub fn append_batch(&mut self, rows: &[&str]) -> Result<(), Box> { - self.inner.write_batch(rows) + /// Append batch of Vortex array data to the stream + pub fn append_batch(&mut self, array: &VortexArrayRef) -> Result<(), Box> { + self.inner.write_batch(array) } /// Flush pending writes (closes current file, opens new file on next write) diff --git a/vine-core/src/vortex_exp.rs b/vine-core/src/vortex_exp.rs index 81e02f8..46a02df 100644 --- a/vine-core/src/vortex_exp.rs +++ b/vine-core/src/vortex_exp.rs @@ -1,7 +1,6 @@ /// Provides Vortex-based file I/O for Vine. /// - DType conversion between Vine metadata and Vortex /// - File read/write with date partitioning -/// - CSV ↔ Vortex array conversion for JNI /// use std::path::Path; @@ -239,7 +238,7 @@ pub fn create_session() -> VortexSession { /// # Arguments /// * `path` - Output file path /// * `metadata` - Vine metadata schema -/// * `rows` - Data rows as CSV-like strings (comma-separated values) +/// * `rows` - Data rows as comma-separated strings /// /// # Example /// ```ignore @@ -392,7 +391,7 @@ fn build_typed_array(type_str: &str, values: &[&str]) -> VortexResult Ok(builder.finish().into_array()) } - // Binary (base64 encoded in CSV) + // Binary (base64 encoded) "binary" => { let mut builder = VarBinViewBuilder::with_capacity( DType::Binary(Nullability::Nullable), @@ -533,391 +532,13 @@ pub fn get_row_count(array: &ArrayRef) -> usize { array.len() } -/// Extract column values as strings based on type -fn extract_column_values(child: &ArrayRef, type_str: &str, num_rows: usize) -> Vec { - use vortex::ToCanonical; - - match type_str.to_lowercase().as_str() { - // Integer types - "byte" | "tinyint" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let val: i8 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0); - val.to_string() - }) - .collect() - } - "short" | "smallint" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let val: i16 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0); - val.to_string() - }) - .collect() - } - "integer" | "int" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let val: i32 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0); - val.to_string() - }) - .collect() - } - "long" | "bigint" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let val: i64 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0); - val.to_string() - }) - .collect() - } - - // Floating point types - "float" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let val: f32 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0.0); - val.to_string() - }) - .collect() - } - "double" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let val: f64 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0.0); - val.to_string() - }) - .collect() - } - - // Boolean - "boolean" | "bool" => { - let bool_arr = child.to_bool(); - (0..num_rows) - .map(|i| { - let val: bool = bool_arr.scalar_at(i).as_ref().try_into().unwrap_or(false); - val.to_string() - }) - .collect() - } - - // String and Decimal (both stored as UTF8) - "string" | "decimal" => { - (0..num_rows) - .map(|i| { - let scalar = child.scalar_at(i); - scalar.as_utf8().value().map(|s| s.to_string()).unwrap_or_default() - }) - .collect() - } - - // Binary (base64 encode for CSV) - "binary" => { - (0..num_rows) - .map(|i| { - let scalar = child.scalar_at(i); - if let Some(bytes) = scalar.as_binary().value() { - base64_encode(&bytes) - } else { - String::new() - } - }) - .collect() - } - - // Date (days since epoch -> YYYY-MM-DD) - "date" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let days: i32 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0); - days_to_date_string(days) - }) - .collect() - } - - // Timestamp (millis since epoch -> ISO format) - "timestamp" => { - let prim = child.to_primitive(); - (0..num_rows) - .map(|i| { - let millis: i64 = prim.scalar_at(i).as_ref().try_into().unwrap_or(0); - millis_to_timestamp_string(millis) - }) - .collect() - } - - // Fallback - _ => (0..num_rows).map(|_| String::new()).collect(), - } -} - -/// Convert days since epoch to date string (YYYY-MM-DD) -fn days_to_date_string(days: i32) -> String { - use chrono::NaiveDate; - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) { - date.format("%Y-%m-%d").to_string() - } else { - "1970-01-01".to_string() - } -} - -/// Convert milliseconds since epoch to timestamp string -fn millis_to_timestamp_string(millis: i64) -> String { - use chrono::{TimeZone, Utc}; - if let Some(dt) = Utc.timestamp_millis_opt(millis).single() { - dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string() - } else { - "1970-01-01 00:00:00.000".to_string() - } -} - -/// Simple base64 encode (for binary data in CSV) -fn base64_encode(bytes: &[u8]) -> String { - const ENCODE_TABLE: &[u8; 64] = - b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - - let mut result = String::with_capacity((bytes.len() + 2) / 3 * 4); - let mut i = 0; - - while i + 2 < bytes.len() { - let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8) | (bytes[i + 2] as u32); - result.push(ENCODE_TABLE[((n >> 18) & 0x3F) as usize] as char); - result.push(ENCODE_TABLE[((n >> 12) & 0x3F) as usize] as char); - result.push(ENCODE_TABLE[((n >> 6) & 0x3F) as usize] as char); - result.push(ENCODE_TABLE[(n & 0x3F) as usize] as char); - i += 3; - } - - if i + 1 == bytes.len() { - let n = (bytes[i] as u32) << 16; - result.push(ENCODE_TABLE[((n >> 18) & 0x3F) as usize] as char); - result.push(ENCODE_TABLE[((n >> 12) & 0x3F) as usize] as char); - result.push('='); - result.push('='); - } else if i + 2 == bytes.len() { - let n = ((bytes[i] as u32) << 16) | ((bytes[i + 1] as u32) << 8); - result.push(ENCODE_TABLE[((n >> 18) & 0x3F) as usize] as char); - result.push(ENCODE_TABLE[((n >> 12) & 0x3F) as usize] as char); - result.push(ENCODE_TABLE[((n >> 6) & 0x3F) as usize] as char); - result.push('='); - } - - result -} - -/// Convert ArrayRef to CSV-formatted rows for JNI compatibility -/// -/// This is the reverse of build_struct_array - extracts data from Vortex arrays -/// and converts back to CSV format for JNI layer. -pub fn array_to_csv_rows(array: &ArrayRef, metadata: &Metadata) -> VortexResult> { - use vortex::ToCanonical; - - let struct_array = array.to_struct(); - let num_rows = struct_array.len(); - let mut rows = Vec::with_capacity(num_rows); - - // Extract each column as canonical array for value access - let mut column_values: Vec> = Vec::with_capacity(metadata.fields.len()); - - // Get all fields from StructArray - let fields = struct_array.fields(); - - for (col_idx, field) in metadata.fields.iter().enumerate() { - let child = fields.get(col_idx) - .ok_or_else(|| format!("Missing field at index {}", col_idx))?; - - let values = extract_column_values(child, &field.data_type, num_rows); - column_values.push(values); - } - - // Transpose: column-oriented -> row-oriented - for row_idx in 0..num_rows { - let row: Vec = column_values.iter() - .map(|col| col[row_idx].clone()) - .collect(); - rows.push(row.join(",")); - } - - Ok(rows) -} - -/// Read all Vortex files from a directory and return CSV rows -/// -/// Scans date-partitioned directories (YYYY-MM-DD format) and reads all .vtx files. -/// Returns data as CSV-formatted strings for JNI compatibility. -pub fn read_vine_vortex_data(dir_path: &str) -> VortexResult> { - use std::fs; - use std::path::PathBuf; - use chrono::NaiveDate; - - let base_path = PathBuf::from(dir_path); - - // Load metadata from vine_meta.json - let meta_path = base_path.join("vine_meta.json"); - let metadata = Metadata::load(&meta_path) - .map_err(|e| format!("Failed to load metadata: {}", e))?; - - let mut all_rows = Vec::new(); - let mut directories = Vec::new(); - - // Scan for date-partitioned directories - let dir_entries = fs::read_dir(&base_path) - .map_err(|e| format!("Cannot read directory {:?}: {}", base_path, e))?; - - for entry_result in dir_entries { - let entry = entry_result.map_err(|e| format!("Cannot read entry: {}", e))?; - let path = entry.path(); - - if path.is_dir() { - if let Some(dir_name) = path.file_name().and_then(|s| s.to_str()) { - if let Ok(date) = NaiveDate::parse_from_str(dir_name, "%Y-%m-%d") { - directories.push((date, path)); - } - } - } - } - - // Sort directories by date - directories.sort_by_key(|(date, _)| *date); - - // Read all Vortex files from date directories - for (_, dir_path) in directories { - let sub_dir = fs::read_dir(&dir_path) - .map_err(|e| format!("Cannot read directory {:?}: {}", dir_path, e))?; - - for file_entry_result in sub_dir { - let file_path = file_entry_result - .map_err(|e| format!("Cannot read file entry: {}", e))? - .path(); - - // Process .vtx files only - if file_path.extension().map_or(false, |ext| ext == "vtx") { - match read_vortex_file(&file_path) { - Ok((_, array)) => { - match array_to_csv_rows(&array, &metadata) { - Ok(rows) => all_rows.extend(rows), - Err(e) => eprintln!("Warning: Failed to convert {:?}: {}", file_path, e), - } - } - Err(e) => eprintln!("Warning: Failed to read {:?}: {}", file_path, e), - } - } - } - } - - Ok(all_rows) -} - -/// Write data to Vortex format with date partitioning -/// -/// Creates date-partitioned directory structure and writes data as .vtx files. -/// Compatible with existing Vine storage layout. -pub fn write_vine_vortex_data>( - base_path: P, - rows: &[&str], -) -> VortexResult { - use std::fs; - use chrono::Local; - - let base = base_path.as_ref(); - - // Load metadata - let meta_path = base.join("vine_meta.json"); - let metadata = Metadata::load(&meta_path) - .map_err(|e| format!("Failed to load metadata: {}", e))?; - - // Create date partition directory - let date_str = Local::now().format("%Y-%m-%d").to_string(); - let partition_dir = base.join(&date_str); - fs::create_dir_all(&partition_dir) - .map_err(|e| format!("Failed to create partition dir: {}", e))?; - - // Generate filename with microsecond precision - let timestamp = Local::now().format("%H%M%S_%f").to_string(); - let file_path = partition_dir.join(format!("data_{}.vtx", timestamp)); - - // Write using existing function - write_vortex_file(&file_path, &metadata, rows) -} // ============================================================================ -// Helper functions for direct Arrow ↔ Vortex conversion +// Direct Array Write (used by vine_batch_writer) // ============================================================================ -// -// TODO: Part of direct Arrow ↔ Vortex conversion (currently disabled) -// See arrow_bridge.rs for status and implementation plan -// - -#[cfg(feature = "direct-vortex-conversion")] -/// Extract string value from Vortex array at given index -/// -/// **TODO: Currently unused - part of direct conversion implementation** -/// -/// Used by Arrow bridge for direct Vortex → Arrow conversion -pub fn extract_string_value(array: &ArrayRef, index: usize) -> VortexResult { - use vortex::ToCanonical; - - if !array.is_valid(index) { - return Ok(String::new()); - } - - // Convert to canonical VarBin form - let canonical = array.to_canonical() - .map_err(|e| format!("Failed to convert to canonical: {}", e))?; - - // Try to extract as VarBin (string) - if let Ok(varbin) = canonical.as_varbin_view() { - if let Some(bytes) = varbin.bytes_at(index) { - return String::from_utf8(bytes.into()) - .map_err(|e| format!("Failed to decode UTF-8: {}", e).into()); - } - } - - Ok(String::new()) -} - -#[cfg(feature = "direct-vortex-conversion")] -/// Extract binary value from Vortex array at given index -/// -/// **TODO: Currently unused - part of direct conversion implementation** -/// -/// Used by Arrow bridge for direct Vortex → Arrow conversion -pub fn extract_binary_value(array: &ArrayRef, index: usize) -> VortexResult> { - use vortex::ToCanonical; - - if !array.is_valid(index) { - return Ok(Vec::new()); - } - // Convert to canonical VarBin form - let canonical = array.to_canonical() - .map_err(|e| format!("Failed to convert to canonical: {}", e))?; - - // Try to extract as VarBin (binary) - if let Ok(varbin) = canonical.as_varbin_view() { - if let Some(bytes) = varbin.bytes_at(index) { - return Ok(bytes.into()); - } - } - - Ok(Vec::new()) -} - -#[cfg(feature = "direct-vortex-conversion")] -/// Write Vortex array directly to file (no CSV conversion) -/// -/// **TODO: Currently unused - part of direct conversion implementation** +/// Write Vortex array directly to file /// -/// This is used by the direct Arrow → Vortex path. -/// Accepts a Vortex StructArray and writes it directly to a .vtx file. pub fn write_vortex_array>( file_path: P, vortex_array: &ArrayRef, @@ -926,44 +547,13 @@ pub fn write_vortex_array>( let session = create_session(); rt.block_on(async { - let write_options = session.default_write_options(); - let file = session.create(file_path.as_ref()).await?; - let mut writer = write_options.open(file).await?; - - writer.write_array_columns(vortex_array.clone()).await?; - - let layout_size = writer.finalize().await?; - Ok(layout_size) + let file = async_fs::File::create(file_path.as_ref()).await?; + let write_options = session.write_options(); + let stream = vortex_array.clone().to_array_stream(); + let summary = write_options.write(file, stream).await?; + Ok(summary.size()) }) } -#[cfg(feature = "direct-vortex-conversion")] -/// Write Vortex array to date-partitioned Vine storage (direct, no CSV) -/// -/// **TODO: Currently unused - part of direct conversion implementation** -/// -/// This is the optimized write path that accepts Vortex arrays directly. -/// Used by Arrow IPC functions for maximum performance. -pub fn write_vine_vortex_array>( - base_path: P, - vortex_array: &ArrayRef, -) -> VortexResult { - use std::fs; - use chrono::Local; - let base = base_path.as_ref(); - - // Create date partition directory - let date_str = Local::now().format("%Y-%m-%d").to_string(); - let partition_dir = base.join(&date_str); - fs::create_dir_all(&partition_dir) - .map_err(|e| format!("Failed to create partition dir: {}", e))?; - - // Generate filename with microsecond precision - let timestamp = Local::now().format("%H%M%S_%f").to_string(); - let file_path = partition_dir.join(format!("data_{}.vtx", timestamp)); - - // Write directly - write_vortex_array(&file_path, vortex_array) -} From ffa99076ed8796c51ec91ea28adbbfec75ea571b Mon Sep 17 00:00:00 2001 From: kination Date: Sun, 22 Feb 2026 09:44:30 +0900 Subject: [PATCH 2/2] Remove old conversion logics for legacy writing --- vine-core/Cargo.lock | 226 ++++------ vine-core/Cargo.toml | 19 +- vine-core/src/arrow_bridge.rs | 751 ++++------------------------------ vine-core/src/lib.rs | 128 +++--- 4 files changed, 219 insertions(+), 905 deletions(-) diff --git a/vine-core/Cargo.lock b/vine-core/Cargo.lock index 77a55fb..7a25238 100644 --- a/vine-core/Cargo.lock +++ b/vine-core/Cargo.lock @@ -114,30 +114,14 @@ version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "num", ] -[[package]] -name = "arrow-array" -version = "53.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d45fe6d3faed0435b7313e59a02583b14c6c6339fa7729e94c32a20af319a79" -dependencies = [ - "ahash", - "arrow-buffer 53.4.1", - "arrow-data 53.4.1", - "arrow-schema 53.4.1", - "chrono", - "half", - "hashbrown 0.15.5", - "num", -] - [[package]] name = "arrow-array" version = "56.2.0" @@ -145,26 +129,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" dependencies = [ "ahash", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "half", "hashbrown 0.16.1", "num", ] -[[package]] -name = "arrow-buffer" -version = "53.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b5c681a99606f3316f2a99d9c8b6fa3aad0b1d34d8f6d7a1b471893940219d8" -dependencies = [ - "bytes", - "half", - "num", -] - [[package]] name = "arrow-buffer" version = "56.2.0" @@ -178,15 +151,15 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.4.0" +version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c73c6233c5b5d635a56f6010e6eb1ab9e30e94707db21cea03da317f67d84cf3" +checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" dependencies = [ - "arrow-array 53.4.0", - "arrow-buffer 53.4.1", - "arrow-data 53.4.1", - "arrow-schema 53.4.1", - "arrow-select 53.4.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "atoi", "base64", "chrono", @@ -196,42 +169,30 @@ dependencies = [ "ryu", ] -[[package]] -name = "arrow-data" -version = "53.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd962fc3bf7f60705b25bcaa8eb3318b2545aa1d528656525ebdd6a17a6cd6fb" -dependencies = [ - "arrow-buffer 53.4.1", - "arrow-schema 53.4.1", - "half", - "num", -] - [[package]] name = "arrow-data" version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" dependencies = [ - "arrow-buffer 56.2.0", - "arrow-schema 56.2.0", + "arrow-buffer", + "arrow-schema", "half", "num", ] [[package]] name = "arrow-ipc" -version = "53.4.0" +version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0270dc511f11bb5fa98a25020ad51a99ca5b08d8a8dfbd17503bb9dba0388f0b" +checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" dependencies = [ - "arrow-array 53.4.0", - "arrow-buffer 53.4.1", - "arrow-cast", - "arrow-data 53.4.1", - "arrow-schema 53.4.1", - "flatbuffers 24.12.23", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", ] [[package]] @@ -240,40 +201,20 @@ version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", ] -[[package]] -name = "arrow-schema" -version = "53.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b0f9c0c3582dd55db0f136d3b44bfa0189df07adcf7dc7f2f2e74db0f52eb8" - [[package]] name = "arrow-schema" version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" dependencies = [ - "bitflags 2.10.0", -] - -[[package]] -name = "arrow-select" -version = "53.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7471ba126d0b0aaa24b50a36bc6c25e4e74869a1fd1a5553357027a0b1c8d1f1" -dependencies = [ - "ahash", - "arrow-array 53.4.0", - "arrow-buffer 53.4.1", - "arrow-data 53.4.1", - "arrow-schema 53.4.1", - "num", + "bitflags", ] [[package]] @@ -283,10 +224,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" dependencies = [ "ahash", - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "num", ] @@ -296,11 +237,11 @@ version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "memchr", "num", "regex", @@ -511,12 +452,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.10.0" @@ -905,23 +840,13 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "flatbuffers" -version = "24.12.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096" -dependencies = [ - "bitflags 1.3.2", - "rustc_version", -] - [[package]] name = "flatbuffers" version = "25.12.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3" dependencies = [ - "bitflags 2.10.0", + "bitflags", "rustc_version", ] @@ -1105,12 +1030,6 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -[[package]] -name = "hashbrown" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" - [[package]] name = "hashbrown" version = "0.16.1" @@ -2009,7 +1928,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.10.0", + "bitflags", ] [[package]] @@ -2062,7 +1981,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ - "bitflags 2.10.0", + "bitflags", "errno", "libc", "linux-raw-sys", @@ -2448,11 +2367,12 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" name = "vine-core" version = "0.2.0" dependencies = [ - "arrow-array 53.4.0", - "arrow-buffer 53.4.1", - "arrow-data 53.4.1", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", "arrow-ipc", - "arrow-schema 53.4.1", + "arrow-schema", "async-fs", "base64", "chrono", @@ -2532,19 +2452,19 @@ checksum = "66ba62607af32da3a08c0d6eea4b913547e5febe31c75f6f3e718d95b1721e55" dependencies = [ "arcref", "arrow-arith", - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-data 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-data", "arrow-ord", - "arrow-schema 56.2.0", - "arrow-select 56.2.0", + "arrow-schema", + "arrow-select", "arrow-string", "async-trait", "bitvec", "cfg-if", "enum-iterator", "enum-map", - "flatbuffers 25.12.19", + "flatbuffers", "futures", "getrandom 0.3.4", "humansize", @@ -2614,7 +2534,7 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4bf1a90619f7ef3f45b3bff8f177fedfc3e00c79db3de3839600a158c2a80ac" dependencies = [ - "arrow-buffer 56.2.0", + "arrow-buffer", "bitvec", "bytes", "cudarc", @@ -2645,9 +2565,9 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dc8da56c88eee6485942ad34ee1481e2c575b9d07847aa4599c1bb24d9f8449" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", - "arrow-schema 56.2.0", + "arrow-array", + "arrow-buffer", + "arrow-schema", "num-traits", "vortex-buffer", "vortex-dtype", @@ -2694,9 +2614,9 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad4bb9776fe0483b3c74180515a14de8a3b27efe17bc8b4cff0781229faf9141" dependencies = [ - "arrow-buffer 56.2.0", - "arrow-schema 56.2.0", - "flatbuffers 25.12.19", + "arrow-buffer", + "arrow-schema", + "flatbuffers", "half", "itertools", "jiff", @@ -2719,8 +2639,8 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3d205fa3696ba6040dbd710404922c1b41da8c4231bc4629b617c3f3bb98328" dependencies = [ - "arrow-schema 56.2.0", - "flatbuffers 25.12.19", + "arrow-schema", + "flatbuffers", "jiff", "prost", "tokio", @@ -2734,7 +2654,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9932a1ab9f0cf69aba55dbbe12616c450c8e881580c0789626b31822b28efbd2" dependencies = [ "arrayref", - "arrow-buffer 56.2.0", + "arrow-buffer", "fastlanes", "itertools", "lending-iterator", @@ -2762,7 +2682,7 @@ dependencies = [ "async-trait", "bytes", "cudarc", - "flatbuffers 25.12.19", + "flatbuffers", "futures", "getrandom 0.3.4", "itertools", @@ -2804,7 +2724,7 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0f536161b5661ec03eb6613596d613374a501cf0e07ce722dcbd1d6d9db71e2" dependencies = [ - "flatbuffers 25.12.19", + "flatbuffers", "vortex-buffer", ] @@ -2893,7 +2813,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fdc271a6bb8b9e7d4357e800e99dee518ae8db6bb6bc69b84ceb7bbd4a01008" dependencies = [ "bytes", - "flatbuffers 25.12.19", + "flatbuffers", "futures", "itertools", "pin-project-lite", @@ -2911,11 +2831,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "131022b7d32a2e9bedbf2be93f9d604ec3d904d8d57bc3622b7a26ce8d78df25" dependencies = [ "arcref", - "arrow-buffer 56.2.0", + "arrow-buffer", "async-stream", "async-trait", "cudarc", - "flatbuffers 25.12.19", + "flatbuffers", "futures", "itertools", "kanal", @@ -3007,8 +2927,8 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da7646fdcb086f02af3345d26f76706c2aab58e19f7a52a40231a809a7ce19ed" dependencies = [ - "arrow-array 56.2.0", - "arrow-buffer 56.2.0", + "arrow-array", + "arrow-buffer", "itertools", "num-traits", "prost", @@ -3026,7 +2946,7 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74be634609faaa3fc30e617ffdfdb098e0262064f06136e335ed386ef1347228" dependencies = [ - "arrow-array 56.2.0", + "arrow-array", "bytes", "itertools", "num-traits", @@ -3045,8 +2965,8 @@ version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a5070a976d0f766621014e766fad3235904a3952b8a450327439bf9f93b93fc" dependencies = [ - "arrow-array 56.2.0", - "arrow-schema 56.2.0", + "arrow-array", + "arrow-schema", "bit-vec", "cudarc", "futures", diff --git a/vine-core/Cargo.toml b/vine-core/Cargo.toml index 503ca06..c1a3183 100644 --- a/vine-core/Cargo.toml +++ b/vine-core/Cargo.toml @@ -13,12 +13,13 @@ lazy_static = "1.4" base64 = "0.22" # Apache Arrow for JNI data transfer (Arrow IPC format) -# Note: Using specific versions to avoid chrono compatibility issues -arrow-schema = { version = "53.4", default-features = false } -arrow-array = { version = "53.4", default-features = false } -arrow-buffer = { version = "53.4", default-features = false } -arrow-data = { version = "53.4", default-features = false } -arrow-ipc = { version = "53.4", default-features = false } +# Must match Vortex v0.56.0's arrow dependency version +arrow-schema = { version = "56.2", default-features = false } +arrow-array = { version = "56.2", default-features = false } +arrow-buffer = { version = "56.2", default-features = false } +arrow-data = { version = "56.2", default-features = false } +arrow-ipc = { version = "56.2", default-features = false } +arrow-cast = { version = "56.2", default-features = false } # Vortex (primary storage format) vortex = { version = "0.56.0", features = ["tokio"] } @@ -29,7 +30,7 @@ async-fs = { version = "2" } [dev-dependencies] tempfile = "3.8" -arrow-schema = { version = "53.4", default-features = false } +arrow-schema = { version = "56.2", default-features = false } tokio = { version = "1", features = ["rt-multi-thread", "macros"] } chrono = "0.4" vortex-dtype = { version = "0.56.0" } @@ -37,3 +38,7 @@ vortex-dtype = { version = "0.56.0" } [lib] name = "vine_core" crate-type = ["cdylib", "rlib"] + +[[bin]] +name = "generate-test-data" +path = "src/bin/generate_test_data.rs" diff --git a/vine-core/src/arrow_bridge.rs b/vine-core/src/arrow_bridge.rs index 2058eba..6c9690b 100644 --- a/vine-core/src/arrow_bridge.rs +++ b/vine-core/src/arrow_bridge.rs @@ -1,37 +1,28 @@ use std::io::Cursor; use std::sync::Arc; - -use arrow_array::{ - Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, - Int8Array, Int16Array, Int32Array, Int64Array, StringArray, RecordBatch, -}; -use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; +use arrow_array::RecordBatch; +use arrow_schema::{ArrowError, DataType, Schema}; use arrow_ipc::reader::StreamReader; use arrow_ipc::writer::StreamWriter; -use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; +use arrow_cast::cast; + +use vortex::ArrayRef as VortexArrayRef; +use vortex::arrow::FromArrowArray; use crate::metadata::{Metadata, MetadataField}; -use crate::vortex_exp::{self, VortexResult}; -use vortex::{Array as VortexArray, ArrayRef as VortexArrayRef}; -use vortex::arrays::{BoolArray, PrimitiveArray, StructArray}; -use vortex::validity::Validity; -use vortex_dtype::{DType, Nullability, PType}; /// Result type for Arrow bridge operations pub type ArrowBridgeResult = Result>; +// ============================================================================ +// Arrow IPC Serialization +// ============================================================================ + /// Deserialize Arrow IPC bytes into RecordBatch -/// -/// # Arguments -/// * `data` - Arrow IPC stream bytes from JVM -/// -/// # Returns -/// * `RecordBatch` containing the deserialized data pub fn deserialize_arrow_ipc(data: &[u8]) -> Result { let cursor = Cursor::new(data); let mut reader = StreamReader::try_new(cursor, None)?; - // Read first (and only) batch match reader.next() { Some(Ok(batch)) => Ok(batch), Some(Err(e)) => Err(e), @@ -40,12 +31,6 @@ pub fn deserialize_arrow_ipc(data: &[u8]) -> Result { } /// Serialize RecordBatch to Arrow IPC bytes -/// -/// # Arguments -/// * `batch` - RecordBatch to serialize -/// -/// # Returns -/// * `Vec` containing Arrow IPC stream bytes for JVM pub fn serialize_arrow_ipc(batch: &RecordBatch) -> Result, ArrowError> { let mut buffer = Vec::new(); { @@ -56,22 +41,72 @@ pub fn serialize_arrow_ipc(batch: &RecordBatch) -> Result, ArrowError> { Ok(buffer) } -/// Convert Vine metadata to Arrow schema +// ============================================================================ +// Direct Arrow ↔ Vortex Conversion +// ============================================================================ + +/// Convert Arrow RecordBatch directly to Vortex ArrayRef /// -/// # Note -/// This is used by the CSV bridge functions (csv_rows_to_record_batch). -/// Will be removed when direct Arrow->Vortex conversion is implemented. -fn metadata_to_arrow_schema(metadata: &Metadata) -> ArrowBridgeResult { - let fields: Vec = metadata - .fields - .iter() - .map(|field| { - let arrow_type = vine_type_to_arrow(&field.data_type); - Field::new(&field.name, arrow_type, !field.is_required) - }) - .collect(); +/// Uses Vortex native `FromArrowArray` trait for zero-copy conversion +pub fn arrow_to_vortex(batch: &RecordBatch) -> ArrowBridgeResult { + Ok(VortexArrayRef::from_arrow(batch, false)) +} - Ok(Schema::new(fields)) +/// Convert Vortex ArrayRef directly to Arrow RecordBatch +/// +/// # Arguments +/// * `vortex_array` - Source Vortex array +/// * `compat_mode` - If true, converts modern Arrow types (e.g. Utf8View) to legacy types (e.g. Utf8) +pub fn vortex_to_arrow(vortex_array: &VortexArrayRef, compat_mode: bool) -> ArrowBridgeResult { + match RecordBatch::try_from(vortex_array.as_ref()) { + Ok(batch) => { + if compat_mode { + // Ensure compatibility with older Arrow implementations (e.g. Java 14, Spark 3.5) + make_batch_compatible(&batch) + } else { + Ok(batch) + } + } + Err(e) => Err(format!("Failed to convert Vortex array to Arrow RecordBatch: {}", e).into()), + } +} + +/// Ensure RecordBatch is compatible with older Arrow implementations (e.g. Java 14). +/// +/// Converts modern types (Utf8View, BinaryView) back to standard Utf8/Binary. +pub fn make_batch_compatible(batch: &RecordBatch) -> ArrowBridgeResult { + let mut new_columns = Vec::new(); + let mut new_fields = Vec::new(); + let mut changed = false; + + for (field, column) in batch.schema().fields().iter().zip(batch.columns()) { + let (new_field, new_column) = match field.data_type() { + DataType::Utf8View => { + changed = true; + let casted = cast(column, &DataType::Utf8)?; + let mut f = field.as_ref().clone(); + f.set_data_type(DataType::Utf8); + (Arc::new(f), casted) + } + DataType::BinaryView => { + changed = true; + let casted = cast(column, &DataType::Binary)?; + let mut f = field.as_ref().clone(); + f.set_data_type(DataType::Binary); + (Arc::new(f), casted) + } + _ => (field.clone(), column.clone()), + }; + new_fields.push(new_field); + new_columns.push(new_column); + } + + if changed { + let new_schema = Arc::new(Schema::new(new_fields)); + Ok(RecordBatch::try_new(new_schema, new_columns)?) + } else { + Ok(batch.clone()) + } } /// Convert Arrow schema to Vine metadata @@ -94,25 +129,6 @@ pub fn arrow_schema_to_metadata(schema: &Schema, table_name: &str) -> Metadata { Metadata::new(table_name, fields) } -/// Convert Vine type string to Arrow DataType -fn vine_type_to_arrow(vine_type: &str) -> DataType { - match vine_type.to_lowercase().as_str() { - "byte" | "tinyint" => DataType::Int8, - "short" | "smallint" => DataType::Int16, - "integer" | "int" => DataType::Int32, - "long" | "bigint" => DataType::Int64, - "float" => DataType::Float32, - "double" => DataType::Float64, - "boolean" | "bool" => DataType::Boolean, - "string" => DataType::Utf8, - "binary" => DataType::Binary, - "date" => DataType::Date32, // Days since epoch - "timestamp" => DataType::Timestamp(TimeUnit::Millisecond, None), - "decimal" => DataType::Utf8, // Stored as string for precision - _ => DataType::Utf8, // Fallback - } -} - /// Convert Arrow DataType to Vine type string fn arrow_type_to_vine(arrow_type: &DataType) -> String { match arrow_type { @@ -130,622 +146,3 @@ fn arrow_type_to_vine(arrow_type: &DataType) -> String { _ => "string".to_string(), // Fallback } } - -// ============================================================================ -// Temporary CSV Bridge Utilities -// ============================================================================ -// -// TODO: Replace these utility functions with direct Arrow ↔ Vortex conversion -// -// These functions isolate the CSV conversion logic so it can be easily replaced -// with direct conversion once Vortex API is stable. When implementing direct -// conversion, only these two functions need to be modified: -// -// 1. arrow_to_storage_format() - Replace CSV conversion with direct Arrow → Vortex -// 2. storage_format_to_arrow() - Replace CSV conversion with direct Vortex → Arrow -// -// Impact: Changing only these two functions will update all JNI write/read paths -// ============================================================================ - -/// Convert Arrow RecordBatch to storage format (currently CSV, future: direct Vortex) -/// -/// **TODO: Replace CSV conversion with direct Arrow → Vortex when Vortex API is stable** -/// -/// # Arguments -/// * `batch` - Arrow RecordBatch from JVM -/// -/// # Returns -/// * Storage format data (currently Vec of CSV rows) -/// -/// # Migration path -/// When implementing direct conversion: -/// 1. Change return type from Vec to VortexArrayRef -/// 2. Replace body with: `record_batch_to_vortex(batch)` (from direct_conversion mod) -/// 3. Update callers to use vortex writer instead of CSV writer -/// -pub fn arrow_to_storage_format(batch: &RecordBatch) -> ArrowBridgeResult> { - // TODO: Replace with direct conversion - // return Ok(record_batch_to_vortex(batch)?); - record_batch_to_csv_rows(batch) -} - -/// Convert storage format to Arrow RecordBatch (currently from CSV, future: direct from Vortex) -/// -/// **TODO: Replace CSV conversion with direct Vortex → Arrow when Vortex API is stable** -/// -/// # Arguments -/// * `data` - Storage format data (currently Vec of CSV rows) -/// * `metadata` - Vine metadata for schema -/// -/// # Returns -/// * Arrow RecordBatch for JVM -/// -/// # Migration path -/// When implementing direct conversion: -/// 1. Change first parameter type from Vec to VortexArrayRef -/// 2. Replace body with: `vortex_to_record_batch(vortex_array, metadata)` (from direct_conversion mod) -/// 3. Update callers to pass vortex array instead of CSV rows -/// -pub fn storage_format_to_arrow( - csv_rows: &[String], - metadata: &Metadata, -) -> ArrowBridgeResult { - // TODO: Replace with direct conversion - // return Ok(vortex_to_record_batch(vortex_array, metadata)?); - csv_rows_to_record_batch(csv_rows, metadata) -} - -/// Convert RecordBatch to CSV rows for Vortex writer -/// -/// # Note -/// This function is a temporary bridge between Arrow IPC and CSV-based Vortex writer. -/// Will be replaced with direct Arrow → Vortex conversion in future -/// -fn record_batch_to_csv_rows(batch: &RecordBatch) -> ArrowBridgeResult> { - let num_rows = batch.num_rows(); - let num_cols = batch.num_columns(); - let mut rows = Vec::with_capacity(num_rows); - - for row_idx in 0..num_rows { - let mut values = Vec::with_capacity(num_cols); - - for col_idx in 0..num_cols { - let column = batch.column(col_idx); - let value = extract_value(column, row_idx); - values.push(value); - } - - rows.push(values.join(",")); - } - - Ok(rows) -} - -/// Convert CSV rows to RecordBatch for JNI return -/// -/// # Note -/// This function is a temporary bridge between CSV-based reader and Arrow IPC. -/// Will be replaced with direct Vortex → Arrow conversion in future -/// -fn csv_rows_to_record_batch( - rows: &[String], - metadata: &Metadata, -) -> ArrowBridgeResult { - let schema = metadata_to_arrow_schema(metadata)?; - let num_rows = rows.len(); - - // Parse rows into columns - let parsed_rows: Vec> = rows - .iter() - .map(|row| row.split(',').map(|s| s.trim()).collect()) - .collect(); - - // Build column arrays - let mut columns: Vec = Vec::with_capacity(metadata.fields.len()); - - for (col_idx, field) in metadata.fields.iter().enumerate() { - let values: Vec<&str> = parsed_rows - .iter() - .map(|row| row.get(col_idx).copied().unwrap_or("")) - .collect(); - - let array = build_arrow_array(&field.data_type, &values, num_rows)?; - columns.push(array); - } - - let batch = RecordBatch::try_new(Arc::new(schema), columns)?; - Ok(batch) -} - -/// Extract value from Arrow array at given index -fn extract_value(column: &ArrayRef, row_idx: usize) -> String { - if column.is_null(row_idx) { - return String::new(); - } - - match column.data_type() { - DataType::Int8 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Int16 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Int32 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Int64 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Float32 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Float64 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Boolean => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Utf8 => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() - } - DataType::Binary => { - let arr = column.as_any().downcast_ref::().unwrap(); - base64_encode(arr.value(row_idx)) - } - DataType::Date32 => { - let arr = column.as_any().downcast_ref::().unwrap(); - days_to_date_string(arr.value(row_idx)) - } - DataType::Timestamp(_, _) => { - let arr = column.as_any().downcast_ref::().unwrap(); - arr.value(row_idx).to_string() // Return millis as string - } - _ => String::new(), - } -} - -/// Build Arrow array from string values based on Vine type -fn build_arrow_array( - type_str: &str, - values: &[&str], - _num_rows: usize, -) -> ArrowBridgeResult { - match type_str.to_lowercase().as_str() { - "byte" | "tinyint" => { - let arr: Int8Array = values.iter().map(|v| v.parse::().ok()).collect(); - Ok(Arc::new(arr)) - } - "short" | "smallint" => { - let arr: Int16Array = values.iter().map(|v| v.parse::().ok()).collect(); - Ok(Arc::new(arr)) - } - "integer" | "int" => { - let arr: Int32Array = values.iter().map(|v| v.parse::().ok()).collect(); - Ok(Arc::new(arr)) - } - "long" | "bigint" => { - let arr: Int64Array = values.iter().map(|v| v.parse::().ok()).collect(); - Ok(Arc::new(arr)) - } - "float" => { - let arr: Float32Array = values.iter().map(|v| v.parse::().ok()).collect(); - Ok(Arc::new(arr)) - } - "double" => { - let arr: Float64Array = values.iter().map(|v| v.parse::().ok()).collect(); - Ok(Arc::new(arr)) - } - "boolean" | "bool" => { - let arr: BooleanArray = values - .iter() - .map(|v| Some(matches!(v.to_lowercase().as_str(), "true" | "1" | "yes"))) - .collect(); - Ok(Arc::new(arr)) - } - "string" | "decimal" => { - let arr: StringArray = values.iter().map(|v| Some(*v)).collect(); - Ok(Arc::new(arr)) - } - "binary" => { - let decoded: Vec>> = values - .iter() - .map(|v| base64_decode(v).ok()) - .collect(); - let arr: BinaryArray = decoded - .iter() - .map(|opt| opt.as_ref().map(|v| v.as_slice())) - .collect(); - Ok(Arc::new(arr)) - } - "date" => { - let arr: Int32Array = values.iter().map(|v| Some(parse_date_to_days(v))).collect(); - Ok(Arc::new(arr)) - } - "timestamp" => { - let arr: Int64Array = values - .iter() - .map(|v| Some(parse_timestamp_to_millis(v))) - .collect(); - Ok(Arc::new(arr)) - } - _ => { - let arr: StringArray = values.iter().map(|v| Some(*v)).collect(); - Ok(Arc::new(arr)) - } - } -} - -// ============================================================================ -// Helper Functions -// ============================================================================ - -/// Parse date string (YYYY-MM-DD) to days since Unix epoch -fn parse_date_to_days(s: &str) -> i32 { - use chrono::NaiveDate; - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - NaiveDate::parse_from_str(s, "%Y-%m-%d") - .map(|d| (d - epoch).num_days() as i32) - .unwrap_or(0) -} - -/// Parse timestamp string to milliseconds since Unix epoch -fn parse_timestamp_to_millis(s: &str) -> i64 { - // Try parsing as epoch milliseconds first - if let Ok(millis) = s.parse::() { - return millis; - } - // Try ISO 8601 format - use chrono::DateTime; - if let Ok(dt) = DateTime::parse_from_rfc3339(s) { - return dt.timestamp_millis(); - } - 0 -} - -/// Convert days since epoch to date string (YYYY-MM-DD) -fn days_to_date_string(days: i32) -> String { - use chrono::NaiveDate; - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - if let Some(date) = epoch.checked_add_signed(chrono::Duration::days(days as i64)) { - date.format("%Y-%m-%d").to_string() - } else { - "1970-01-01".to_string() - } -} - -/// Base64 encode bytes using the base64 crate -fn base64_encode(bytes: &[u8]) -> String { - BASE64.encode(bytes) -} - -/// Base64 decode string using the base64 crate -fn base64_decode(s: &str) -> Result, Box> { - BASE64.decode(s.trim()).map_err(|e| e.into()) -} - -// ============================================================================ -// Direct Arrow ↔ Vortex Conversion (No CSV intermediate) -// ============================================================================ -// -// TODO: Complete implementation of direct Arrow ↔ Vortex conversion -// -// Status: PARTIAL IMPLEMENTATION (disabled due to Vortex API issues) -// -// Current blockers: -// 1. PrimitiveArray::from_vec() API not available in current Vortex version -// 2. BoolArray::from_vec() API not available -// 3. VarBinViewBuilder API signatures don't match (push_null, finish methods) -// 4. Validity buffer conversion needs correct API usage -// 5. StructArray::from_fields() error handling -// -// Expected performance gain: 20-30% reduction in overhead -// -// Current workaround: Using CSV bridge (record_batch_to_csv_rows / csv_rows_to_record_batch) -// -// Next steps: -// 1. Update to stable Vortex API version with complete builder APIs -// 2. Fix Validity buffer conversion (Vortex validity ↔ Arrow null buffer) -// 3. Implement proper VarBinView builder usage for strings/binary -// 4. Add comprehensive tests for all data types -// 5. Benchmark performance vs CSV bridge -// -// References: -// - lib.rs: Arrow IPC JNI functions using CSV bridge (lines 220-314) -// - vortex_exp.rs: Existing CSV-based conversion (array_to_csv_rows, build_struct_array) -// -// ============================================================================ - -// Disable compilation of direct conversion code until Vortex API is fixed -#[cfg(feature = "direct-vortex-conversion")] -mod direct_conversion { -use super::*; - -/// Convert Arrow RecordBatch directly to Vortex StructArray -/// -/// **TODO: Currently disabled - requires Vortex API fixes** -/// -/// This function provides direct Arrow → Vortex conversion without CSV intermediate. -/// Eliminates 20-30% overhead compared to the CSV bridge approach. -/// -/// # Arguments -/// * `batch` - Arrow RecordBatch to convert -/// -/// # Returns -/// * `VortexArrayRef` - Vortex StructArray ready for file write -/// -/// # Status -/// Partial implementation with compilation errors. See module-level TODO for details. -pub fn record_batch_to_vortex(batch: &RecordBatch) -> ArrowBridgeResult { - use vortex::builders::ArrayBuilder; - use vortex::IntoArray; - - let schema = batch.schema(); - let num_rows = batch.num_rows(); - let num_cols = batch.num_columns(); - - // Build Vortex columns from Arrow columns - let mut vortex_columns: Vec = Vec::with_capacity(num_cols); - - for col_idx in 0..num_cols { - let arrow_column = batch.column(col_idx); - let field = schema.field(col_idx); - let vortex_array = arrow_array_to_vortex(arrow_column, field.data_type())?; - vortex_columns.push(vortex_array); - } - - // Build field names from schema - let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect(); - - // Create Vortex StructArray - let struct_array = StructArray::from_fields(field_names, vortex_columns) - .map_err(|e| format!("Failed to create Vortex StructArray: {}", e))?; - - Ok(struct_array.into_array()) -} - -/// Convert single Arrow array to Vortex array -/// -/// **TODO: Currently disabled - part of direct conversion implementation** -fn arrow_array_to_vortex(arrow_array: &ArrayRef, data_type: &DataType) -> ArrowBridgeResult { - use vortex::builders::VarBinViewBuilder; - use vortex::validity::Validity; - use vortex::IntoArray; - - match data_type { - DataType::Int8 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| if arr.is_null(i) { 0 } else { arr.value(i) }).collect(); - let validity = build_validity(arr.nulls()); - Ok(PrimitiveArray::from_vec(values, validity).into_array()) - } - DataType::Int16 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| if arr.is_null(i) { 0 } else { arr.value(i) }).collect(); - let validity = build_validity(arr.nulls()); - Ok(PrimitiveArray::from_vec(values, validity).into_array()) - } - DataType::Int32 | DataType::Date32 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| if arr.is_null(i) { 0 } else { arr.value(i) }).collect(); - let validity = build_validity(arr.nulls()); - Ok(PrimitiveArray::from_vec(values, validity).into_array()) - } - DataType::Int64 | DataType::Timestamp(_, _) | DataType::Date64 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| if arr.is_null(i) { 0 } else { arr.value(i) }).collect(); - let validity = build_validity(arr.nulls()); - Ok(PrimitiveArray::from_vec(values, validity).into_array()) - } - DataType::Float32 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| if arr.is_null(i) { 0.0 } else { arr.value(i) }).collect(); - let validity = build_validity(arr.nulls()); - Ok(PrimitiveArray::from_vec(values, validity).into_array()) - } - DataType::Float64 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| if arr.is_null(i) { 0.0 } else { arr.value(i) }).collect(); - let validity = build_validity(arr.nulls()); - Ok(PrimitiveArray::from_vec(values, validity).into_array()) - } - DataType::Boolean => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let values: Vec = (0..arr.len()).map(|i| !arr.is_null(i) && arr.value(i)).collect(); - let validity = build_validity(arr.nulls()); - Ok(BoolArray::from_vec(values, validity).into_array()) - } - DataType::Utf8 | DataType::LargeUtf8 => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let mut builder = VarBinViewBuilder::::new(); - for i in 0..arr.len() { - if arr.is_null(i) { - builder.push_null(); - } else { - builder.push_value(arr.value(i)); - } - } - Ok(builder.finish(DType::Utf8(Nullability::Nullable)).into_array()) - } - DataType::Binary | DataType::LargeBinary => { - let arr = arrow_array.as_any().downcast_ref::().unwrap(); - let mut builder = VarBinViewBuilder::<[u8]>::new(); - for i in 0..arr.len() { - if arr.is_null(i) { - builder.push_null(); - } else { - builder.push_value(arr.value(i)); - } - } - Ok(builder.finish(DType::Binary(Nullability::Nullable)).into_array()) - } - _ => Err(format!("Unsupported Arrow data type: {:?}", data_type).into()), - } -} - -/// Build Vortex Validity from Arrow nulls buffer -/// -/// **TODO: Currently disabled - part of direct conversion implementation** -fn build_validity(nulls: Option<&arrow_buffer::NullBuffer>) -> Validity { - match nulls { - Some(null_buffer) => { - // Convert Arrow null buffer to Vortex validity - let null_count = null_buffer.null_count(); - if null_count == 0 { - Validity::NonNullable - } else { - // Extract null bitmap - let buffer = null_buffer.inner(); - Validity::from(buffer.clone()) - } - } - None => Validity::NonNullable, - } -} - -/// Convert Vortex StructArray directly to Arrow RecordBatch -/// -/// **TODO: Currently disabled - requires Vortex API fixes** -/// -/// This function provides direct Vortex → Arrow conversion without CSV intermediate. -/// Eliminates 20-30% overhead compared to the CSV bridge approach. -/// -/// # Arguments -/// * `vortex_array` - Vortex StructArray from file read -/// * `metadata` - Vine metadata for schema information -/// -/// # Returns -/// * `RecordBatch` - Arrow RecordBatch ready for IPC serialization -/// -/// # Status -/// Partial implementation with compilation errors. See module-level TODO for details. -pub fn vortex_to_record_batch(vortex_array: &VortexArrayRef, metadata: &Metadata) -> ArrowBridgeResult { - use vortex::arrays::StructArray; - - // Cast to StructArray - let struct_array = StructArray::try_from(vortex_array) - .map_err(|e| format!("Failed to cast to StructArray: {}", e))?; - - let num_rows = vortex_exp::get_row_count(vortex_array); - - // Build Arrow schema from metadata - let arrow_fields: Vec = metadata.fields.iter().map(|f| { - let arrow_type = vine_type_to_arrow(&f.data_type); - Field::new(&f.name, arrow_type, !f.is_required) - }).collect(); - let arrow_schema = Arc::new(Schema::new(arrow_fields)); - - // Convert each Vortex column to Arrow column - let mut arrow_columns: Vec = Vec::with_capacity(metadata.fields.len()); - - for (idx, field) in metadata.fields.iter().enumerate() { - let vortex_child = struct_array.field(idx) - .ok_or_else(|| format!("Missing field at index {}", idx))?; - - let arrow_array = vortex_array_to_arrow(&vortex_child, &field.data_type, num_rows)?; - arrow_columns.push(arrow_array); - } - - // Create RecordBatch - let batch = RecordBatch::try_new(arrow_schema, arrow_columns) - .map_err(|e| format!("Failed to create RecordBatch: {}", e))?; - - Ok(batch) -} - -/// Convert single Vortex array to Arrow array -/// -/// **TODO: Currently disabled - part of direct conversion implementation** -fn vortex_array_to_arrow(vortex_array: &VortexArrayRef, vine_type: &str, num_rows: usize) -> ArrowBridgeResult { - match vine_type.to_lowercase().as_str() { - "byte" | "tinyint" => { - let prim = vortex_array.to_primitive(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = prim.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(Int8Array::from(values))) - } - "short" | "smallint" => { - let prim = vortex_array.to_primitive(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = prim.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(Int16Array::from(values))) - } - "integer" | "int" | "date" => { - let prim = vortex_array.to_primitive(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = prim.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(Int32Array::from(values))) - } - "long" | "bigint" | "timestamp" => { - let prim = vortex_array.to_primitive(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = prim.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(Int64Array::from(values))) - } - "float" => { - let prim = vortex_array.to_primitive(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = prim.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(Float32Array::from(values))) - } - "double" => { - let prim = vortex_array.to_primitive(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = prim.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(Float64Array::from(values))) - } - "boolean" | "bool" => { - let bool_arr = vortex_array.to_bool(); - let values: Vec> = (0..num_rows).map(|i| { - let scalar = bool_arr.scalar_at(i); - scalar.as_ref().try_into().ok() - }).collect(); - Ok(Arc::new(BooleanArray::from(values))) - } - "string" | "decimal" => { - // Vortex strings are stored as VarBinView - let values: Vec> = (0..num_rows).map(|i| { - if vortex_array.is_valid(i) { - // Extract string value from Vortex array - vortex_exp::extract_string_value(vortex_array, i).ok() - } else { - None - } - }).collect(); - Ok(Arc::new(StringArray::from(values))) - } - "binary" => { - let values: Vec>> = (0..num_rows).map(|i| { - if vortex_array.is_valid(i) { - vortex_exp::extract_binary_value(vortex_array, i).ok() - } else { - None - } - }).collect(); - Ok(Arc::new(BinaryArray::from(values))) - } - _ => Err(format!("Unsupported Vine type: {}", vine_type).into()), - } -} - -} // end mod direct_conversion - diff --git a/vine-core/src/lib.rs b/vine-core/src/lib.rs index dd6f8ad..d6752e4 100644 --- a/vine-core/src/lib.rs +++ b/vine-core/src/lib.rs @@ -1,7 +1,6 @@ pub mod metadata; pub mod writer_config; pub mod writer_cache; -pub mod streaming_writer; pub mod streaming_writer_v2; pub mod vine_batch_writer; pub mod vine_streaming_writer; @@ -33,7 +32,9 @@ lazy_static::lazy_static! { // Reader JNI Functions // ============================================================================ -/// Read data from Vine storage +/// Read data from Vine storage and return as string (legacy compat) +/// +/// Reads Vortex arrays from storage, converts to Arrow RecordBatch #[no_mangle] #[allow(non_snake_case)] #[allow(unused_variables)] @@ -47,12 +48,35 @@ pub extern "C" fn Java_io_kination_vine_VineModule_readDataFromVine( .expect("Cannot find data in 'dir_path'") .into(); - let rows: Vec = read_vine_data(&path); + let arrays = read_vine_data(&path); let mut result: String = String::new(); - for row in rows { - result.push_str(&row); - result.push('\n') + for array in arrays { + // use compat mode (true) for legacy string formatting + // TODO: Add option to disable compat mode and use arrow_cast display + match vortex_to_arrow(&array, true) { + Ok(batch) => { + // Use arrow_cast display for proper formatting + let options = arrow_cast::display::FormatOptions::default(); + let formatters: Vec<_> = batch + .columns() + .iter() + .map(|c| arrow_cast::display::ArrayFormatter::try_new(c.as_ref(), &options).unwrap()) + .collect(); + + for row_idx in 0..batch.num_rows() { + let mut row_values: Vec = Vec::with_capacity(batch.num_columns()); + for formatter in &formatters { + row_values.push(formatter.value(row_idx).to_string()); + } + result.push_str(&row_values.join(",")); + result.push('\n'); + } + } + Err(e) => { + eprintln!("Error converting Vortex to Arrow: {}", e); + } + } } let output = CString::new(result).expect("Cannot generate CString from result"); @@ -61,14 +85,6 @@ pub extern "C" fn Java_io_kination_vine_VineModule_readDataFromVine( .into_raw() } -// ============================================================================ -// Batch Writer JNI Functions -// ============================================================================ -// -// Note: CSV-based batch write functions have been removed in favor of Arrow IPC. -// Use batchWriteArrow for better performance (5-10x faster than CSV format). -// ============================================================================ - // ============================================================================ // Streaming Writer JNI Functions // ============================================================================ @@ -137,20 +153,12 @@ pub extern "C" fn Java_io_kination_vine_VineModule_streamingClose( // Arrow IPC JNI Functions // ============================================================================ -use arrow_bridge::{deserialize_arrow_ipc, serialize_arrow_ipc, arrow_to_storage_format, storage_format_to_arrow}; -use metadata::Metadata; +use arrow_bridge::{deserialize_arrow_ipc, serialize_arrow_ipc, arrow_to_vortex, vortex_to_arrow}; /// Batch write data using Arrow IPC format /// -/// This function receives Arrow IPC bytes from JVM, deserializes to RecordBatch, -/// converts to storage format (currently CSV), and writes via Vortex writer. -/// -/// TODO: -/// Update arrow_to_storage_format() to make direct Arrow → Vortex conversion -/// Migration process (when Vortex API is ready) -/// 1. Update arrow_bridge::arrow_to_storage_format() to use direct Arrow → Vortex -/// 2. Update VineBatchWriter to accept Vortex arrays instead of CSV -/// 3. No changes needed in this function - it will automatically benefit +/// Receives Arrow IPC bytes from JVM, converts directly to Vortex array, +/// and writes to storage. #[no_mangle] #[allow(non_snake_case)] #[allow(unused_variables)] @@ -180,30 +188,22 @@ pub extern "C" fn Java_io_kination_vine_VineModule_batchWriteArrow( let batch = deserialize_arrow_ipc(byte_slice) .expect("Failed to deserialize Arrow IPC"); - // Convert Arrow to storage format (currently CSV, future: direct Vortex) - // TODO: This will automatically use direct conversion once arrow_to_storage_format() is updated - let storage_data = arrow_to_storage_format(&batch) - .expect("Failed to convert Arrow to storage format"); - - let rows_refs: Vec<&str> = storage_data.iter().map(|s| s.as_str()).collect(); + // Arrow → Vortex conversion + let vortex_array = arrow_to_vortex(&batch) + .expect("Failed to convert Arrow to Vortex"); - // Write to storage - // TODO: Update VineBatchWriter to accept Vortex arrays when direct conversion is ready - VineBatchWriter::write(&path_str, &rows_refs) + // Write Vortex array directly to storage + VineBatchWriter::write(&path_str, &vortex_array) .expect("Failed to batch write"); } /// Read data and return as Arrow IPC format /// -/// This function reads from Vortex storage, converts to Arrow RecordBatch, +/// Reads Vortex arrays from storage, converts directly to Arrow RecordBatch, /// serializes to Arrow IPC bytes, and returns to JVM. /// -/// TODO: -/// Update storage_format_to_arrow() to make direct Vortex → Arrow conversion -/// Migration path (when Vortex API is ready) -/// 1. Update storage reader to return Vortex arrays instead of CSV -/// 2. Update arrow_bridge::storage_format_to_arrow() to use direct Vortex → Arrow -/// 3. No changes needed in this function - it will automatically benefit +/// # Arguments +/// * `compat_mode` - 0=None, 1=Compat (Java14/Spark3.5, Utf8View -> Utf8) #[no_mangle] #[allow(non_snake_case)] #[allow(unused_variables)] @@ -211,28 +211,27 @@ pub extern "C" fn Java_io_kination_vine_VineModule_readDataArrow( mut env: JNIEnv, class: JClass, dir_path: JString, + compat_mode: jni::sys::jint, ) -> jni::sys::jbyteArray { let path: String = env.get_string(&dir_path).expect("Failed to get path").into(); - // Load metadata for schema - let meta_path = format!("{}/vine_meta.json", path); - let metadata = Metadata::load(&meta_path) - .expect("Failed to load metadata"); - - // Read from storage (currently returns CSV, future: will return Vortex arrays) - // TODO: Update read_vine_data() to return Vortex arrays when direct conversion is ready - let storage_data: Vec = read_vine_data(&path); + // Read Vortex arrays directly from storage + let arrays = read_vine_data(&path); - if storage_data.is_empty() { - // Return empty byte array + if arrays.is_empty() { return env.new_byte_array(0) .expect("Failed to create empty byte array") .into_raw(); } - // Convert storage format to Arrow (currently from CSV, future: direct from Vortex) - let batch = storage_format_to_arrow(&storage_data, &metadata) - .expect("Failed to convert storage format to Arrow"); + // Convert first Vortex array to Arrow RecordBatch + // TODO: Support multiple arrays by concatenating + + // Determine compatibility mode from JNI argument + let use_compat = compat_mode == 1; + + let batch = vortex_to_arrow(&arrays[0], use_compat) + .expect("Failed to convert Vortex to Arrow"); // Serialize to Arrow IPC bytes let arrow_bytes = serialize_arrow_ipc(&batch) @@ -252,12 +251,8 @@ pub extern "C" fn Java_io_kination_vine_VineModule_readDataArrow( /// Append batch to streaming writer using Arrow IPC format /// -/// TODO: -/// Update arrow_to_storage_format() to make direct Arrow → Vortex conversion -/// Migration path (when Vortex API is ready) -/// 1. Update arrow_bridge::arrow_to_storage_format() to use direct Arrow → Vortex -/// 2. Update VineStreamingWriter to accept Vortex arrays instead of CSV -/// 3. No changes needed in this function - it will automatically benefit +/// Receives Arrow IPC bytes, converts directly to Vortex array, +/// and appends to the streaming writer. #[no_mangle] #[allow(non_snake_case)] #[allow(unused_variables)] @@ -285,17 +280,14 @@ pub extern "C" fn Java_io_kination_vine_VineModule_streamingAppendBatchArrow( let batch = deserialize_arrow_ipc(byte_slice) .expect("Failed to deserialize Arrow IPC"); - // Convert Arrow to storage format (currently CSV, future: direct Vortex) - let storage_data = arrow_to_storage_format(&batch) - .expect("Failed to convert Arrow to storage format"); - - let rows_refs: Vec<&str> = storage_data.iter().map(|s| s.as_str()).collect(); + // Arrow → Vortex conversion + let vortex_array = arrow_to_vortex(&batch) + .expect("Failed to convert Arrow to Vortex"); - // Use existing streaming writer - // TODO: Update VineStreamingWriter to accept Vortex arrays when direct conversion is ready + // Append Vortex array directly to streaming writer let mut writers = STREAMING_WRITERS.lock().unwrap(); if let Some(writer) = writers.get_mut(&writer_id) { - writer.append_batch(&rows_refs).expect("Failed to append batch"); + writer.append_batch(&vortex_array).expect("Failed to append batch"); } else { panic!("Writer ID {} not found", writer_id); }