diff --git a/vortex-cuda/benches/filter_cuda.rs b/vortex-cuda/benches/filter_cuda.rs index fb2b6f938c2..f2d9cb74259 100644 --- a/vortex-cuda/benches/filter_cuda.rs +++ b/vortex-cuda/benches/filter_cuda.rs @@ -21,6 +21,7 @@ use cudarc::driver::CudaView; use cudarc::driver::DevicePtr; use cudarc::driver::DevicePtrMut; use cudarc::driver::DeviceRepr; +use cudarc::driver::ValidAsZeroBits; use cudarc::driver::sys::CUevent_flags; use futures::executor::block_on; use vortex::error::VortexExpect; @@ -135,7 +136,15 @@ async fn run_filter_timed( /// Benchmark filter for a specific type. fn benchmark_filter_type(c: &mut Criterion, type_name: &str) where - T: CubFilterable + DeviceRepr + From + Debug + Clone + Send + Sync + 'static, + T: CubFilterable + + DeviceRepr + + ValidAsZeroBits + + From + + Debug + + Clone + + Send + + Sync + + 'static, { let mut group = c.benchmark_group("cuda"); diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 96382e91068..889a8158910 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -217,10 +217,12 @@ async fn export_arrow_validity_buffer( ) -> VortexResult<(Option, i64)> { let mask = validity.execute_mask(len, ctx.execution_ctx())?; let null_count = i64::try_from(mask.false_count())?; + let validity_bits = len + arrow_offset; + let validity_bytes = validity_bits.div_ceil(8); let validity_buffer = match mask { Mask::AllTrue(_) => return Ok((None, 0)), - Mask::AllFalse(len) => ByteBuffer::zeroed((len + arrow_offset).div_ceil(8)), + Mask::AllFalse(_) => ByteBuffer::zeroed(validity_bytes), values @ Mask::Values(_) => values.into_bit_buffer().into_inner().2, }; let validity = ctx diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index 5a4e826da81..aa18cc30899 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -12,6 +12,7 @@ use cudarc::driver::CudaSlice; use cudarc::driver::DeviceRepr; use cudarc::driver::LaunchArgs; use cudarc::driver::LaunchConfig; +use cudarc::driver::ValidAsZeroBits; use futures::future::BoxFuture; use tracing::debug; use tracing::trace; @@ -256,7 +257,7 @@ impl CudaExecutionCtx { data: D, ) -> VortexResult>> where - T: DeviceRepr + Debug + Send + Sync + 'static, + T: DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + 'static, D: AsRef<[T]> + Send + 'static, { self.stream.copy_to_device(data) diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index 1b7c99c54a9..1120ac50e02 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -4,18 +4,22 @@ //! CUDA stream utility functions. use std::fmt::Debug; +use std::mem::size_of; +use std::mem::size_of_val; use std::ops::Deref; use std::sync::Arc; use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; use cudarc::driver::DeviceRepr; +use cudarc::driver::ValidAsZeroBits; use cudarc::driver::result::stream; use futures::future::BoxFuture; use kanal::Sender; use tracing::warn; use vortex::array::buffer::BufferHandle; use vortex::error::VortexResult; +use vortex::error::vortex_ensure; use vortex::error::vortex_err; use crate::CudaDeviceBuffer; @@ -62,22 +66,32 @@ impl VortexCudaStream { /// synchronously before returning. For **pinned** host memory the transfer /// is truly async and the source must stay alive until the copy completes /// (guaranteed by the returned future capturing it). + /// + /// The returned [`BufferHandle`] keeps the source byte length, while its + /// CUDA allocation may include zeroed tail padding. This is needed for + /// Arrow validity buffers passed to cuDF, which reads masks as 32-bit words. pub(crate) fn copy_to_device( &self, data: D, ) -> VortexResult>> where - T: DeviceRepr + Debug + Send + Sync + 'static, + T: DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + 'static, D: AsRef<[T]> + Send + 'static, { let host_slice: &[T] = data.as_ref(); + let byte_count = size_of_val(host_slice); + let allocation_len = padded_device_allocation_len::(byte_count)?; // `device_alloc` binds the CUDA context to the current thread. - let mut cuda_slice: CudaSlice = self.device_alloc(host_slice.len())?; + let mut cuda_slice: CudaSlice = self.device_alloc::(allocation_len)?; - self.memcpy_htod(host_slice, &mut cuda_slice) + let mut values = cuda_slice.slice_mut(..host_slice.len()); + self.memcpy_htod(host_slice, &mut values) .map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?; + zero_padding(self, &mut cuda_slice, host_slice.len())?; + let cuda_buf = CudaDeviceBuffer::new(cuda_slice); + let buffer = BufferHandle::new_device(Arc::new(cuda_buf)).slice(0..byte_count); let stream = Arc::clone(&self.0); Ok(Box::pin(async move { @@ -86,7 +100,7 @@ impl VortexCudaStream { // Keep source memory alive until copy completes. let _keep_alive = data; - Ok(BufferHandle::new_device(Arc::new(cuda_buf))) + Ok(buffer) })) } @@ -99,20 +113,62 @@ impl VortexCudaStream { /// For **pageable** host memory (the common case), `memcpy_htod` stages /// the source into a driver-managed pinned buffer before returning, so /// the source data is safe to drop after this call. + /// + /// Like [`copy_to_device`](Self::copy_to_device), this preserves the source + /// byte length on the returned handle while keeping any tail padding in the + /// backing CUDA allocation. pub(crate) fn copy_to_device_sync(&self, data: &[T]) -> VortexResult where - T: DeviceRepr + Debug + Send + Sync + 'static, + T: DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + 'static, { - let mut cuda_slice: CudaSlice = self.device_alloc(data.len())?; + let byte_count = size_of_val(data); + let allocation_len = padded_device_allocation_len::(byte_count)?; + let mut cuda_slice: CudaSlice = self.device_alloc(allocation_len)?; - self.memcpy_htod(data, &mut cuda_slice) + let mut values = cuda_slice.slice_mut(..data.len()); + self.memcpy_htod(data, &mut values) .map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?; + zero_padding(self, &mut cuda_slice, data.len())?; + let cuda_buf = CudaDeviceBuffer::new(cuda_slice); - Ok(BufferHandle::new_device(Arc::new(cuda_buf))) + Ok(BufferHandle::new_device(Arc::new(cuda_buf)).slice(0..byte_count)) } } +/// Returns the typed CUDA allocation length for `byte_count`. +/// +/// The backing allocation is padded for cuDF's 32-bit validity mask reads. +/// The returned length is in `T` elements. +fn padded_device_allocation_len(byte_count: usize) -> VortexResult { + let element_size = size_of::(); + vortex_ensure!( + element_size != 0, + "cannot copy zero-sized values to CUDA device" + ); + let min_allocation_bytes = byte_count.next_multiple_of(size_of::()); + Ok(min_allocation_bytes.div_ceil(element_size)) +} + +/// Zeroes the allocation tail after the copied values. +/// +/// Returned handles are sliced to the copied byte count; the trailing padding +/// exists so a final 32-bit mask read stays within the backing allocation. +fn zero_padding( + stream: &VortexCudaStream, + cuda_slice: &mut CudaSlice, + copied_len: usize, +) -> VortexResult<()> { + if copied_len >= cuda_slice.len() { + return Ok(()); + } + + let mut padding = cuda_slice.slice_mut(copied_len..); + stream + .memset_zeros(&mut padding) + .map_err(|e| vortex_err!("Failed to zero device buffer padding: {}", e)) +} + /// Registers a callback and asynchronously waits for its completion. /// /// This function can be used to asynchronously wait for events previously @@ -191,3 +247,47 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult VortexResult<()> { + assert_eq!(padded_device_allocation_len::(0)?, 0); + assert_eq!(padded_device_allocation_len::(1)?, 4); + assert_eq!(padded_device_allocation_len::(4)?, 4); + assert_eq!(padded_device_allocation_len::(5)?, 8); + assert_eq!(padded_device_allocation_len::(1)?, 1); + assert_eq!(padded_device_allocation_len::(5)?, 2); + Ok(()) + } + + #[crate::test] + async fn test_copy_to_device_preserves_visible_len_with_padding() -> VortexResult<()> { + let ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let handle = ctx.stream().copy_to_device(vec![0xab_u8])?.await?; + + assert_eq!(handle.len(), 1); + let host = handle.try_to_host()?.await?; + assert_eq!(host.as_slice(), &[0xab]); + + Ok(()) + } + + #[crate::test] + async fn test_copy_to_device_sync_preserves_visible_len_with_padding() -> VortexResult<()> { + let ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let handle = ctx.stream().copy_to_device_sync(&[1_u8, 2, 3, 4, 5])?; + + assert_eq!(handle.len(), 5); + let host = handle.try_to_host()?.await?; + assert_eq!(host.as_slice(), &[1, 2, 3, 4, 5]); + + Ok(()) + } +} diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 52fff823600..9aef8a427c3 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -68,17 +68,21 @@ pub unsafe extern "C" fn export_array( ) -> i32 { let mut ctx = CudaSession::create_execution_ctx(&SESSION).unwrap(); - let primitive = PrimitiveArray::from_iter(0u32..5); - let decimal = DecimalArray::from_iter(0i128..5, DecimalDType::new(38, 2)); - let strings = VarBinViewArray::from_iter_str([ - "one", - "two", - "this string is long three", - "four", - "this string is long five", + let primitive = PrimitiveArray::from_option_iter([Some(0u32), None, Some(2), Some(3), None]); + let decimal = DecimalArray::from_option_iter( + [Some(0i128), Some(1), None, Some(3), Some(4)], + DecimalDType::new(38, 2), + ); + let strings = VarBinViewArray::from_iter_nullable_str([ + Some("one"), + None, + Some("this string is long three"), + Some("four"), + None, ]); let dates = TemporalArray::new_date( - PrimitiveArray::from_iter([100i32, 200, 300, 400, 500]).into_array(), + PrimitiveArray::from_option_iter([Some(100i32), None, Some(300), Some(400), None]) + .into_array(), TimeUnit::Days, ); @@ -124,24 +128,24 @@ pub unsafe extern "C" fn validate_array( let array = make_array(array_data); let struct_array = array.as_struct(); - let primitive = UInt32Array::from_iter(0..5); - let decimal = Decimal128Array::from_iter_values(0..5) + let primitive = UInt32Array::from_iter([Some(0), None, Some(2), Some(3), None]); + let decimal = Decimal128Array::from_iter([Some(0i128), Some(1), None, Some(3), Some(4)]) .with_precision_and_scale(38, 2) .expect("with_precision_and_scale"); - let string = StringArray::from_iter_values([ - "one", - "two", - "this string is long three", - "four", - "this string is long five", + let string = StringArray::from_iter([ + Some("one"), + None, + Some("this string is long three"), + Some("four"), + None, ]); - let date = Date32Array::from(vec![100i32, 200, 300, 400, 500]); + let date = Date32Array::from(vec![Some(100i32), None, Some(300), Some(400), None]); let expected_fields = Fields::from_iter([ - Field::new("prims", primitive.data_type().clone(), false), - Field::new("decimals", decimal.data_type().clone(), false), - Field::new("strings", string.data_type().clone(), false), - Field::new("dates", date.data_type().clone(), false), + Field::new("prims", primitive.data_type().clone(), true), + Field::new("decimals", decimal.data_type().clone(), true), + Field::new("strings", string.data_type().clone(), true), + Field::new("dates", date.data_type().clone(), true), ]); assert_eq!(