diff --git a/encodings/runend/src/compress.rs b/encodings/runend/src/compress.rs index ba46588d88b..18f1ce7c15b 100644 --- a/encodings/runend/src/compress.rs +++ b/encodings/runend/src/compress.rs @@ -6,6 +6,7 @@ use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; +use vortex_array::arrays::Bool; use vortex_array::arrays::BoolArray; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::Primitive; @@ -94,6 +95,67 @@ pub fn runend_encode( (ends, values) } +/// Run-end encode a `BoolArray`, returning a tuple of `(ends, values)`. +/// +/// The `values` array is a [`BoolArray`] for ordinary inputs. For all-invalid inputs it is a +/// single-row null [`ConstantArray`]. +pub fn runend_encode_bool( + array: ArrayView, + ctx: &mut ExecutionCtx, +) -> (PrimitiveArray, ArrayRef) { + let validity = match array + .validity() + .vortex_expect("run-end validity should be derivable") + { + Validity::NonNullable => None, + Validity::AllValid => None, + Validity::AllInvalid => { + let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable) + .narrow(ctx) + .vortex_expect("Ends must succeed downcasting"); + ends.statistics() + .set(Stat::IsStrictSorted, Precision::Exact(true.into())); + return ( + ends, + ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(), + ); + } + Validity::Array(a) => { + let bool_array = a + .execute::(ctx) + .vortex_expect("validity array must be convertible to bool"); + Some(bool_array.to_bit_buffer()) + } + }; + + let bits = array.to_bit_buffer(); + if bits.is_empty() { + let ends = PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable) + .narrow(ctx) + .vortex_expect("Ends must succeed downcasting"); + ends.statistics() + .set(Stat::IsStrictSorted, Precision::Exact(true.into())); + return ( + ends, + BoolArray::new(BitBuffer::empty(), array.dtype().nullability().into()).into_array(), + ); + } + + let (ends, values) = match validity { + None => runend_encode_bools(&bits, array.dtype().nullability().into()), + Some(validity) => runend_encode_nullable_bools(&bits, validity), + }; + + let ends = PrimitiveArray::new(ends, Validity::NonNullable) + .narrow(ctx) + .vortex_expect("Ends must succeed downcasting"); + + ends.statistics() + .set(Stat::IsStrictSorted, Precision::Exact(true.into())); + + (ends, values.into_array()) +} + fn runend_encode_primitive(elements: &[T]) -> (Buffer, Buffer) { let mut ends = BufferMut::empty(); let mut values = BufferMut::empty(); @@ -119,6 +181,81 @@ fn runend_encode_primitive(elements: &[T]) -> (Buffer, Buff (ends.freeze(), values.freeze()) } +fn runend_encode_bools(elements: &BitBuffer, validity: Validity) -> (Buffer, BoolArray) { + debug_assert!(!elements.is_empty()); + + let mut ends = BufferMut::empty(); + let mut values = BitBufferMut::with_capacity(elements.len()); + + let mut prev = elements.value(0); + let mut end = 1; + for value in elements.iter().skip(1) { + if value != prev { + ends.push(end); + values.append(prev); + } + prev = value; + end += 1; + } + ends.push(end); + values.append(prev); + + (ends.freeze(), BoolArray::new(values.freeze(), validity)) +} + +fn runend_encode_nullable_bools( + elements: &BitBuffer, + element_validity: BitBuffer, +) -> (Buffer, BoolArray) { + debug_assert!(!elements.is_empty()); + + let mut ends = BufferMut::empty(); + let mut values = BitBufferMut::with_capacity(elements.len()); + let mut validity = BitBufferMut::with_capacity(elements.len()); + + let mut prev = element_validity.value(0).then(|| elements.value(0)); + let mut end = 1; + for value in elements + .iter() + .zip(element_validity.iter()) + .map(|(value, is_valid)| is_valid.then_some(value)) + .skip(1) + { + if value != prev { + ends.push(end); + match prev { + None => { + validity.append(false); + values.append(false); + } + Some(previous) => { + validity.append(true); + values.append(previous); + } + } + } + prev = value; + end += 1; + } + ends.push(end); + + match prev { + None => { + validity.append(false); + values.append(false); + } + Some(previous) => { + validity.append(true); + values.append(previous); + } + } + + ( + ends.freeze(), + BoolArray::new(values.freeze(), Validity::from(validity.freeze())), + ) +} + fn runend_encode_nullable_primitive( elements: &[T], element_validity: BitBuffer, @@ -322,6 +459,7 @@ pub fn runend_decode_varbinview( mod tests { use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::validity::Validity; @@ -331,6 +469,7 @@ mod tests { use crate::compress::runend_decode_primitive; use crate::compress::runend_encode; + use crate::compress::runend_encode_bool; #[test] fn encode() -> VortexResult<()> { @@ -383,6 +522,70 @@ mod tests { Ok(()) } + #[test] + fn encode_bool() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let arr = BoolArray::from_iter([true, true, false, false, false, true]); + let (ends, values) = runend_encode_bool(arr.as_view(), &mut ctx); + let values = values.execute::(&mut ctx)?; + + let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 6]); + assert_arrays_eq!(ends, expected_ends); + let expected_values = BoolArray::from_iter([true, false, true]); + assert_arrays_eq!(values, expected_values); + Ok(()) + } + + #[test] + fn encode_bool_nullable() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let arr = BoolArray::from_iter([ + Some(true), + Some(true), + None, + None, + Some(false), + Some(false), + None, + Some(true), + ]); + let (ends, values) = runend_encode_bool(arr.as_view(), &mut ctx); + let values = values.execute::(&mut ctx)?; + + let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 6, 7, 8]); + assert_arrays_eq!(ends, expected_ends); + let expected_values = + BoolArray::from_iter([Some(true), None, Some(false), None, Some(true)]); + assert_arrays_eq!(values, expected_values); + Ok(()) + } + + #[test] + fn encode_bool_all_null() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let arr = BoolArray::new(BitBuffer::new_unset(5), Validity::AllInvalid); + let (ends, values) = runend_encode_bool(arr.as_view(), &mut ctx); + let values = values.execute::(&mut ctx)?; + + let expected_ends = PrimitiveArray::from_iter(vec![5u8]); + assert_arrays_eq!(ends, expected_ends); + let expected_values = BoolArray::from_iter([Option::::None]); + assert_arrays_eq!(values, expected_values); + Ok(()) + } + + #[test] + fn encode_bool_empty() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let arr = BoolArray::from_iter(Vec::::new()); + let (ends, values) = runend_encode_bool(arr.as_view(), &mut ctx); + let values = values.execute::(&mut ctx)?; + + assert!(ends.is_empty()); + assert!(values.is_empty()); + Ok(()) + } + #[test] fn decode() -> VortexResult<()> { let mut ctx = LEGACY_SESSION.create_execution_ctx(); diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index ab77f625764..b6d6605b254 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -26,6 +26,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // Bool schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &bool::BoolConstantScheme, + &bool::BoolRunEndScheme, //////////////////////////////////////////////////////////////////////////////////////////////// // Integer schemes. //////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/vortex-btrblocks/src/canonical_compressor.rs b/vortex-btrblocks/src/canonical_compressor.rs index 0be774bfbc5..d1ab19c3a86 100644 --- a/vortex-btrblocks/src/canonical_compressor.rs +++ b/vortex-btrblocks/src/canonical_compressor.rs @@ -68,14 +68,17 @@ mod tests { use vortex_array::arrays::BoolArray; use vortex_array::arrays::Constant; use vortex_array::arrays::List; + use vortex_array::arrays::ListArray; use vortex_array::arrays::ListView; use vortex_array::arrays::ListViewArray; + use vortex_array::arrays::list::ListArrayExt; use vortex_array::assert_arrays_eq; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::BitBuffer; use vortex_buffer::buffer; use vortex_error::VortexResult; + use vortex_runend::RunEnd; use vortex_session::VortexSession; use crate::BtrBlocksCompressor; @@ -191,4 +194,57 @@ mod tests { assert_arrays_eq!(compressed, array); Ok(()) } + + #[test] + fn test_bool_runend_compressed() -> VortexResult<()> { + let values = (0..4) + .flat_map(|i| { + let value = i % 2 == 0; + std::iter::repeat_n(value, 128) + }) + .collect::>(); + let array = BoolArray::new(BitBuffer::from(values), Validity::NonNullable); + let btr = BtrBlocksCompressor::default(); + let compressed = btr.compress( + &array.clone().into_array(), + &mut SESSION.create_execution_ctx(), + )?; + + assert!(compressed.is::()); + assert_arrays_eq!(compressed, array); + Ok(()) + } + + #[test] + fn test_nested_bool_list_runend_compressed() -> VortexResult<()> { + let leaf_values = (0..4) + .flat_map(|i| { + let value = i % 2 == 0; + std::iter::repeat_n(value, 128) + }) + .collect::>(); + let leaf = BoolArray::new(BitBuffer::from(leaf_values), Validity::NonNullable); + let inner = ListArray::try_new( + leaf.into_array(), + buffer![0u32, 128, 256, 384, 512].into_array(), + Validity::NonNullable, + )?; + let outer = ListArray::try_new( + inner.into_array(), + buffer![0u32, 2, 4].into_array(), + Validity::NonNullable, + )?; + + let btr = BtrBlocksCompressor::default(); + let compressed = btr.compress( + &outer.clone().into_array(), + &mut SESSION.create_execution_ctx(), + )?; + + let outer_compressed = compressed.as_::(); + let inner_compressed = outer_compressed.elements().as_::(); + assert!(inner_compressed.elements().is::()); + assert_arrays_eq!(compressed, outer); + Ok(()) + } } diff --git a/vortex-btrblocks/src/schemes/bool.rs b/vortex-btrblocks/src/schemes/bool.rs index c27251a8599..2f12f746307 100644 --- a/vortex-btrblocks/src/schemes/bool.rs +++ b/vortex-btrblocks/src/schemes/bool.rs @@ -3,5 +3,138 @@ //! Bool compression schemes. +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; pub use vortex_compressor::builtins::BoolConstantScheme; +use vortex_compressor::builtins::IntDictScheme; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::EstimateVerdict; +use vortex_compressor::scheme::ChildSelection; +use vortex_compressor::scheme::DescendantExclusion; pub use vortex_compressor::stats::BoolStats; +use vortex_runend::RunEnd; +use vortex_runend::compress::runend_encode_bool; + +use crate::ArrayAndStats; +use crate::CascadingCompressor; +use crate::CompressorContext; +use crate::Scheme; +use crate::SchemeExt; +use crate::schemes::integer::IntRLEScheme; +use crate::schemes::integer::RunEndScheme; +use crate::schemes::integer::SparseScheme; + +/// Run-end encoding for bool arrays. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BoolRunEndScheme; + +const BOOL_RUN_END_THRESHOLD: usize = 8; + +impl Scheme for BoolRunEndScheme { + fn scheme_name(&self) -> &'static str { + "vortex.bool.runend" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches!(canonical, Canonical::Bool(_)) + } + + fn num_children(&self) -> usize { + 2 + } + + fn descendant_exclusions(&self) -> Vec { + vec![ + DescendantExclusion { + excluded: BoolRunEndScheme.id(), + children: ChildSelection::One(0), + }, + DescendantExclusion { + excluded: IntDictScheme.id(), + children: ChildSelection::One(1), + }, + DescendantExclusion { + excluded: RunEndScheme.id(), + children: ChildSelection::One(1), + }, + DescendantExclusion { + excluded: IntRLEScheme.id(), + children: ChildSelection::One(1), + }, + DescendantExclusion { + excluded: SparseScheme.id(), + children: ChildSelection::One(1), + }, + ] + } + + fn expected_compression_ratio( + &self, + data: &ArrayAndStats, + _compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + let stats = data.bool_stats(exec_ctx); + let run_count = stats.run_count() as usize; + + if run_count == 0 || data.array_len() < run_count.saturating_mul(BOOL_RUN_END_THRESHOLD) { + return CompressionEstimate::Verdict(EstimateVerdict::Skip); + } + + let has_null_runs = stats.null_count() > 0; + let before_nbytes = data.array().nbytes(); + let after_nbytes = + estimated_runend_bool_nbytes(data.array_len(), run_count, has_null_runs) as u64; + + if after_nbytes >= before_nbytes { + return CompressionEstimate::Verdict(EstimateVerdict::Skip); + } + + CompressionEstimate::Verdict(EstimateVerdict::Ratio( + before_nbytes as f64 / after_nbytes as f64, + )) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> vortex_error::VortexResult { + let (ends, values) = runend_encode_bool(data.array_as_bool(), exec_ctx); + + let compressed_values = + compressor.compress_child(&values, &compress_ctx, self.id(), 0, exec_ctx)?; + let compressed_ends = + compressor.compress_child(&ends.into_array(), &compress_ctx, self.id(), 1, exec_ctx)?; + + // SAFETY: compression doesn't affect invariants. + Ok(unsafe { + RunEnd::new_unchecked(compressed_ends, compressed_values, 0, data.array_len()) + .into_array() + }) + } +} + +fn estimated_runend_bool_nbytes(len: usize, run_count: usize, nullable: bool) -> usize { + let ends_nbytes = run_count * run_end_width(len); + let values_nbytes = run_count.div_ceil(8); + let validity_nbytes = if nullable { run_count.div_ceil(8) } else { 0 }; + + ends_nbytes + values_nbytes + validity_nbytes +} + +fn run_end_width(len: usize) -> usize { + if u8::try_from(len).is_ok() { + size_of::() + } else if u16::try_from(len).is_ok() { + size_of::() + } else if u32::try_from(len).is_ok() { + size_of::() + } else { + size_of::() + } +} diff --git a/vortex-compressor/src/stats/bool.rs b/vortex-compressor/src/stats/bool.rs index 8825ec8a7f6..096d545ffea 100644 --- a/vortex-compressor/src/stats/bool.rs +++ b/vortex-compressor/src/stats/bool.rs @@ -6,8 +6,10 @@ use vortex_array::ExecutionCtx; use vortex_array::arrays::BoolArray; use vortex_array::arrays::bool::BoolArrayExt; +use vortex_buffer::BitBuffer; use vortex_error::VortexResult; use vortex_mask::AllOr; +use vortex_mask::Mask; /// Array of booleans and relevant stats for compression. #[derive(Clone, Debug)] @@ -18,6 +20,10 @@ pub struct BoolStats { value_count: u32, /// Number of `true` values among valid (non-null) elements. true_count: u32, + /// Number of logical runs, treating null as a distinct value. + run_count: u32, + /// Average logical run length. + average_run_length: u32, } impl BoolStats { @@ -32,6 +38,8 @@ impl BoolStats { null_count: 0, value_count: 0, true_count: 0, + run_count: 0, + average_run_length: 0, }); } @@ -40,6 +48,8 @@ impl BoolStats { null_count: u32::try_from(input.len())?, value_count: 0, true_count: 0, + run_count: 1, + average_run_length: u32::try_from(input.len())?, }); } @@ -62,10 +72,14 @@ impl BoolStats { } }; + let run_count = bool_run_count(&bits, &validity); + Ok(Self { null_count: u32::try_from(null_count)?, value_count: u32::try_from(value_count)?, true_count: u32::try_from(true_count)?, + run_count: u32::try_from(run_count)?, + average_run_length: u32::try_from(input.len() / run_count)?, }) } @@ -84,12 +98,70 @@ impl BoolStats { self.true_count } + /// Returns the number of logical runs, treating null as a distinct value. + pub fn run_count(&self) -> u32 { + self.run_count + } + + /// Returns the average logical run length. + pub fn average_run_length(&self) -> u32 { + self.average_run_length + } + /// Returns `true` if all valid values are the same (all-true or all-false). pub fn is_constant(&self) -> bool { self.value_count > 0 && (self.true_count == 0 || self.true_count == self.value_count) } } +/// Count bool/null runs. +fn bool_run_count(bits: &BitBuffer, validity: &Mask) -> usize { + if validity.all_true() { + return bool_value_run_count(bits.iter()); + } + + let validity_bits = validity.to_bit_buffer(); + optional_bool_run_count( + bits.iter() + .zip(validity_bits.iter()) + .map(|(value, is_valid)| is_valid.then_some(value)), + ) +} + +/// Count bool runs. +fn bool_value_run_count(mut iter: impl Iterator) -> usize { + let Some(mut previous) = iter.next() else { + return 0; + }; + + let mut runs = 1; + for value in iter { + if value != previous { + previous = value; + runs += 1; + } + } + + runs +} + +/// Count bool/null runs. +fn optional_bool_run_count(mut iter: impl Iterator>) -> usize { + let Some(mut previous) = iter.next() else { + return 0; + }; + + let mut runs = 1; + for value in iter { + if value != previous { + previous = value; + runs += 1; + } + } + + runs +} + #[cfg(test)] mod tests { use vortex_array::LEGACY_SESSION; @@ -112,6 +184,8 @@ mod tests { assert_eq!(stats.value_count, 3); assert_eq!(stats.null_count, 0); assert_eq!(stats.true_count, 3); + assert_eq!(stats.run_count, 1); + assert_eq!(stats.average_run_length, 3); assert!(stats.is_constant()); Ok(()) } @@ -127,6 +201,8 @@ mod tests { assert_eq!(stats.value_count, 3); assert_eq!(stats.null_count, 0); assert_eq!(stats.true_count, 0); + assert_eq!(stats.run_count, 1); + assert_eq!(stats.average_run_length, 3); assert!(stats.is_constant()); Ok(()) } @@ -142,6 +218,8 @@ mod tests { assert_eq!(stats.value_count, 3); assert_eq!(stats.null_count, 0); assert_eq!(stats.true_count, 2); + assert_eq!(stats.run_count, 3); + assert_eq!(stats.average_run_length, 1); assert!(!stats.is_constant()); Ok(()) } @@ -157,6 +235,8 @@ mod tests { assert_eq!(stats.value_count, 2); assert_eq!(stats.null_count, 1); assert_eq!(stats.true_count, 2); + assert_eq!(stats.run_count, 3); + assert_eq!(stats.average_run_length, 1); assert!(stats.is_constant()); Ok(()) } diff --git a/vortex-compressor/src/stats/cache.rs b/vortex-compressor/src/stats/cache.rs index 6f7020191a1..0278cf6b05d 100644 --- a/vortex-compressor/src/stats/cache.rs +++ b/vortex-compressor/src/stats/cache.rs @@ -131,6 +131,17 @@ impl ArrayAndStats { .vortex_expect("the array is guaranteed to already be canonical by construction") } + /// Returns the array as an [`ArrayView`]. + /// + /// # Panics + /// + /// Panics if the array is not a bool array. + pub fn array_as_bool(&self) -> ArrayView<'_, Bool> { + self.array + .as_opt::() + .vortex_expect("the array is guaranteed to already be canonical by construction") + } + /// Returns the array as an [`ArrayView`]. /// /// # Panics diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 8668de339cb..4e96973e5cb 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -39,7 +39,12 @@ pub mod compressor { pub use vortex_btrblocks::BtrBlocksCompressor; pub use vortex_btrblocks::BtrBlocksCompressorBuilder; pub use vortex_btrblocks::Scheme; + pub use vortex_btrblocks::SchemeExt; pub use vortex_btrblocks::SchemeId; + + pub mod schemes { + pub use vortex_btrblocks::schemes::*; + } } pub mod dtype {