diff --git a/parquet-testing b/parquet-testing index a3d96a65e11e..b49107d5e742 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3 +Subproject commit b49107d5e7426ccd657965dbf85fcac430812d64 diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index ba8ffc2e92c3..b251058bd6e9 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -634,6 +634,10 @@ enum Encoding { /// afterwards. Note that the use of this encoding with FIXED_LEN_BYTE_ARRAY(N) data may /// perform poorly for large values of N. BYTE_STREAM_SPLIT = 9; + /// Adaptive Lossless floating-Point encoding (ALP). + /// + /// Currently specified for FLOAT and DOUBLE. + ALP = 10; } ); @@ -654,6 +658,7 @@ impl FromStr for Encoding { "DELTA_BYTE_ARRAY" | "delta_byte_array" => Ok(Encoding::DELTA_BYTE_ARRAY), "RLE_DICTIONARY" | "rle_dictionary" => Ok(Encoding::RLE_DICTIONARY), "BYTE_STREAM_SPLIT" | "byte_stream_split" => Ok(Encoding::BYTE_STREAM_SPLIT), + "ALP" | "alp" => Ok(Encoding::ALP), _ => Err(general_err!("unknown encoding: {}", s)), } } @@ -791,6 +796,7 @@ fn i32_to_encoding(val: i32) -> Encoding { 7 => Encoding::DELTA_BYTE_ARRAY, 8 => Encoding::RLE_DICTIONARY, 9 => Encoding::BYTE_STREAM_SPLIT, + 10 => Encoding::ALP, _ => panic!("Impossible encoding {val}"), } } @@ -2137,6 +2143,7 @@ mod tests { ); assert_eq!(Encoding::DELTA_BYTE_ARRAY.to_string(), "DELTA_BYTE_ARRAY"); assert_eq!(Encoding::RLE_DICTIONARY.to_string(), "RLE_DICTIONARY"); + assert_eq!(Encoding::ALP.to_string(), "ALP"); } #[test] @@ -2438,6 +2445,8 @@ mod tests { assert_eq!(encoding, Encoding::RLE_DICTIONARY); encoding = "BYTE_STREAM_SPLIT".parse().unwrap(); assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT); + encoding = "alp".parse().unwrap(); + assert_eq!(encoding, Encoding::ALP); // test lowercase encoding = "byte_stream_split".parse().unwrap(); @@ -2573,6 +2582,7 @@ mod tests { Encoding::PLAIN_DICTIONARY, Encoding::RLE_DICTIONARY, Encoding::BYTE_STREAM_SPLIT, + Encoding::ALP, ]; encodings_roundtrip(encodings.into()); } diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 58430820a9b6..dce631930975 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -26,6 +26,7 @@ use super::rle::RleDecoder; use crate::basic::*; use crate::data_type::private::ParquetValueType; use crate::data_type::*; +use crate::encodings::decoding::alp::AlpDecoder; use crate::encodings::decoding::byte_stream_split_decoder::{ ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder, }; @@ -33,6 +34,7 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{self, BitReader}; +mod alp; mod byte_stream_split_decoder; pub(crate) mod private { @@ -63,7 +65,8 @@ pub(crate) mod private { Encoding::RLE | Encoding::DELTA_BINARY_PACKED | Encoding::DELTA_BYTE_ARRAY - | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!( + | Encoding::DELTA_LENGTH_BYTE_ARRAY + | Encoding::ALP => Err(general_err!( "Encoding {} is not supported for type", encoding )), @@ -116,6 +119,7 @@ pub(crate) mod private { ) -> Result>> { match encoding { Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())), + Encoding::ALP => Ok(Box::new(AlpDecoder::new())), _ => get_decoder_default(descr, encoding), } } @@ -127,6 +131,7 @@ pub(crate) mod private { ) -> Result>> { match encoding { Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())), + Encoding::ALP => Ok(Box::new(AlpDecoder::new())), _ => get_decoder_default(descr, encoding), } } @@ -1135,6 +1140,8 @@ mod tests { create_and_check_decoder::(Encoding::DELTA_LENGTH_BYTE_ARRAY, None); create_and_check_decoder::(Encoding::DELTA_BYTE_ARRAY, None); create_and_check_decoder::(Encoding::RLE, None); + create_and_check_decoder::(Encoding::ALP, None); + create_and_check_decoder::(Encoding::ALP, None); // error when initializing create_and_check_decoder::( diff --git a/parquet/src/encodings/decoding/alp.rs b/parquet/src/encodings/decoding/alp.rs new file mode 100644 index 000000000000..eb76a27536ce --- /dev/null +++ b/parquet/src/encodings/decoding/alp.rs @@ -0,0 +1,1524 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; +use std::ops::Range; + +use bytes::Bytes; + +use crate::basic::Encoding; +use crate::data_type::DataType; +use crate::encodings::decoding::Decoder; +use crate::errors::{ParquetError, Result}; +use crate::util::bit_util::{BitReader, FromBytes}; + +const ALP_HEADER_SIZE: usize = 7; +const ALP_COMPRESSION_MODE: u8 = 0; +const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0; +const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3; +const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15; +const ALP_MAX_EXPONENT_F32: u8 = 10; +const ALP_MAX_EXPONENT_F64: u8 = 18; + +/// Page-level ALP header (7 bytes). +/// +/// Layout in bytes: +/// - `[0]` `compression_mode` +/// - `[1]` `integer_encoding` +/// - `[2]` `log_vector_size` +/// - `[3..7]` `num_elements` (little-endian `i32`) +#[derive(Debug, Clone, Copy)] +struct AlpHeader { + compression_mode: u8, + integer_encoding: u8, + log_vector_size: u8, + num_elements: i32, +} + +impl AlpHeader { + fn num_elements_usize(&self) -> usize { + self.num_elements as usize + } + + fn vector_size(&self) -> usize { + 1usize << self.log_vector_size + } + + fn num_vectors(&self) -> usize { + if self.num_elements == 0 { + 0 + } else { + self.num_elements_usize().div_ceil(self.vector_size()) + } + } + + fn vector_num_elements(&self, vector_index: usize) -> u16 { + let vector_size = self.vector_size(); + let num_full_vectors = self.num_elements_usize() / vector_size; + let remainder = self.num_elements_usize() % vector_size; + if vector_index < num_full_vectors { + vector_size as u16 + } else if vector_index == num_full_vectors && remainder > 0 { + remainder as u16 + } else { + 0 + } + } +} + +/// Per-vector ALP metadata (4 bytes), equivalent to C++ `AlpEncodedVectorInfo`. +#[derive(Debug, Clone, Copy)] +struct AlpEncodedVectorInfo { + exponent: u8, + factor: u8, + num_exceptions: u16, +} + +impl AlpEncodedVectorInfo { + const STORED_SIZE: usize = 4; +} + +/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for `f64`). +#[derive(Debug, Clone, Copy)] +struct AlpEncodedForVectorInfo { + frame_of_reference: Exact, + bit_width: u8, +} + +impl AlpEncodedForVectorInfo { + fn stored_size() -> usize { + Exact::WIDTH + 1 + } + + fn get_bit_packed_size(&self, num_elements: u16) -> usize { + (self.bit_width as usize * num_elements as usize).div_ceil(8) + } + + fn get_data_stored_size(&self, num_elements: u16, num_exceptions: u16) -> usize { + let bit_packed_size = self.get_bit_packed_size(num_elements); + bit_packed_size + + num_exceptions as usize * std::mem::size_of::() + + num_exceptions as usize * Exact::WIDTH + } +} + +/// Parsed view of one vector's metadata and data slices. +/// +/// `packed_values` is a zero-copy range into page body bytes. +/// Exception positions/values are copied for straightforward decode handling. +#[derive(Debug)] +struct AlpEncodedVectorView { + num_elements: u16, + alp_info: AlpEncodedVectorInfo, + for_info: AlpEncodedForVectorInfo, + packed_values: Range, + exception_positions: Vec, + exception_values: Vec, +} + +impl AlpEncodedVectorView { + fn expected_stored_size(&self) -> usize { + AlpEncodedVectorInfo::STORED_SIZE + + AlpEncodedForVectorInfo::::stored_size() + + self + .for_info + .get_data_stored_size(self.num_elements, self.alp_info.num_exceptions) + } +} + +/// Parsed ALP page layout for one exact integer width (`u32` for float pages, +/// `u64` for double pages). +#[derive(Debug)] +struct AlpPageLayout { + header: AlpHeader, + body: Bytes, + vectors: Vec>, +} + +/// Exact integer type used by FOR reconstruction. +/// +/// This mirrors C++: +/// - `float` -> `uint32_t` +/// - `double` -> `uint64_t` +/// +/// Why unsigned (not `i32`/`i64`)? +/// - FOR stores non-negative deltas optimized for bitpacking. +/// - Unsigned arithmetic avoids signed-overflow edge cases in FOR stage. +/// - Signed interpretation is applied later during decimal reconstruction. +pub(super) trait AlpExact: Copy + std::fmt::Debug { + const WIDTH: usize; + type Signed: Copy; + fn from_le_slice(slice: &[u8]) -> Self; + fn zero() -> Self; + fn wrapping_add(self, rhs: Self) -> Self; + fn reinterpret_as_signed(self) -> Self::Signed; +} + +impl AlpExact for u32 { + const WIDTH: usize = 4; + type Signed = i32; + + fn from_le_slice(slice: &[u8]) -> Self { + u32::from_le_bytes([slice[0], slice[1], slice[2], slice[3]]) + } + + fn zero() -> Self { + 0 + } + + fn wrapping_add(self, rhs: Self) -> Self { + self.wrapping_add(rhs) + } + + fn reinterpret_as_signed(self) -> Self::Signed { + i32::from_ne_bytes(self.to_ne_bytes()) + } +} + +impl AlpExact for u64 { + const WIDTH: usize = 8; + type Signed = i64; + + fn from_le_slice(slice: &[u8]) -> Self { + u64::from_le_bytes([ + slice[0], slice[1], slice[2], slice[3], slice[4], slice[5], slice[6], slice[7], + ]) + } + + fn zero() -> Self { + 0 + } + + fn wrapping_add(self, rhs: Self) -> Self { + self.wrapping_add(rhs) + } + + fn reinterpret_as_signed(self) -> Self::Signed { + i64::from_ne_bytes(self.to_ne_bytes()) + } +} + +const ALP_POW10_F32: [f32; 11] = [ + 1.0, + 10.0, + 100.0, + 1000.0, + 10000.0, + 100000.0, + 1000000.0, + 10000000.0, + 100000000.0, + 1000000000.0, + 10000000000.0, +]; + +const ALP_POW10_F64: [f64; 19] = [ + 1.0, + 10.0, + 100.0, + 1000.0, + 10000.0, + 100000.0, + 1000000.0, + 10000000.0, + 100000000.0, + 1000000000.0, + 10000000000.0, + 100000000000.0, + 1000000000000.0, + 10000000000000.0, + 100000000000000.0, + 1000000000000000.0, + 10000000000000000.0, + 100000000000000000.0, + 1000000000000000000.0, +]; + +const ALP_NEG_POW10_F32: [f32; 11] = [ + 1.0, + 0.1, + 0.01, + 0.001, + 0.0001, + 0.00001, + 0.000001, + 0.0000001, + 0.00000001, + 0.000000001, + 0.0000000001, +]; + +const ALP_NEG_POW10_F64: [f64; 19] = [ + 1.0, + 0.1, + 0.01, + 0.001, + 0.0001, + 0.00001, + 0.000001, + 0.0000001, + 0.00000001, + 0.000000001, + 0.0000000001, + 0.00000000001, + 0.000000000001, + 0.0000000000001, + 0.00000000000001, + 0.000000000000001, + 0.0000000000000001, + 0.00000000000000001, + 0.000000000000000001, +]; + +pub(super) trait AlpFloat: Copy + Default { + type Exact: AlpExact + FromBytes; + type Scale: Copy; + + /// Precompute vector-level ALP decimal scale constants for: + /// `value = (encoded * 10^(factor)) * 10^(-exponent)`. + /// + /// Preconditions are validated during page parse. + fn decode_scale(exponent: u8, factor: u8) -> Self::Scale; + + /// Decode one signed exact integer using a precomputed two-step scale. + fn decode_value(signed_encoded: ::Signed, scale: Self::Scale) -> Self; + + fn from_exact_bits(bits: Self::Exact) -> Self; +} + +impl AlpFloat for f32 { + type Exact = u32; + type Scale = (f32, f32); + + fn decode_scale(exponent: u8, factor: u8) -> Self::Scale { + debug_assert!(exponent <= ALP_MAX_EXPONENT_F32); + debug_assert!(factor <= exponent); + ( + ALP_POW10_F32[factor as usize], + ALP_NEG_POW10_F32[exponent as usize], + ) + } + + fn decode_value(signed_encoded: i32, scale: Self::Scale) -> Self { + ((signed_encoded as f32) * scale.0) * scale.1 + } + + fn from_exact_bits(bits: Self::Exact) -> Self { + f32::from_bits(bits) + } +} + +impl AlpFloat for f64 { + type Exact = u64; + type Scale = (f64, f64); + + fn decode_scale(exponent: u8, factor: u8) -> Self::Scale { + debug_assert!(exponent <= ALP_MAX_EXPONENT_F64); + debug_assert!(factor <= exponent); + ( + ALP_POW10_F64[factor as usize], + ALP_NEG_POW10_F64[exponent as usize], + ) + } + + fn decode_value(signed_encoded: i64, scale: Self::Scale) -> Self { + ((signed_encoded as f64) * scale.0) * scale.1 + } + + fn from_exact_bits(bits: Self::Exact) -> Self { + f64::from_bits(bits) + } +} + +/// Parse and validate a full ALP-encoded page body. +/// +/// Validation includes: +/// - header fields/encoding +/// - non-negative `num_elements` +/// - offsets bounds + monotonicity +/// - per-vector metadata/data section lengths +fn parse_alp_page_layout(data: Bytes) -> Result> { + let data_ref = data.as_ref(); + if data_ref.len() < ALP_HEADER_SIZE { + return Err(general_err!( + "Invalid ALP page: expected at least {} bytes for header, got {}", + ALP_HEADER_SIZE, + data_ref.len() + )); + } + + let header = AlpHeader { + compression_mode: data_ref[0], + integer_encoding: data_ref[1], + log_vector_size: data_ref[2], + num_elements: i32::from_le_bytes([data_ref[3], data_ref[4], data_ref[5], data_ref[6]]), + }; + + if header.compression_mode != ALP_COMPRESSION_MODE { + return Err(general_err!( + "Invalid ALP page: unsupported compression mode {}", + header.compression_mode + )); + } + + if header.integer_encoding != ALP_INTEGER_ENCODING_FOR_BIT_PACK { + return Err(general_err!( + "Invalid ALP page: unsupported integer encoding {}", + header.integer_encoding + )); + } + + if header.log_vector_size < ALP_MIN_LOG_VECTOR_SIZE { + return Err(general_err!( + "Invalid ALP page: log_vector_size {} below min {}", + header.log_vector_size, + ALP_MIN_LOG_VECTOR_SIZE + )); + } + + if header.log_vector_size > ALP_MAX_LOG_VECTOR_SIZE { + return Err(general_err!( + "Invalid ALP page: log_vector_size {} exceeds max {}", + header.log_vector_size, + ALP_MAX_LOG_VECTOR_SIZE + )); + } + + if header.num_elements < 0 { + return Err(general_err!( + "Invalid ALP page: num_elements {} must be >= 0", + header.num_elements + )); + } + + let num_vectors = header.num_vectors(); + + let offsets_len = num_vectors + .checked_mul(std::mem::size_of::()) + .ok_or_else(|| general_err!("Invalid ALP page: offsets length overflow"))?; + let offsets_end = ALP_HEADER_SIZE + .checked_add(offsets_len) + .ok_or_else(|| general_err!("Invalid ALP page: header + offsets length overflow"))?; + + if data_ref.len() < offsets_end { + return Err(general_err!( + "Invalid ALP page: expected at least {} bytes for {} offsets, got {}", + offsets_end, + num_vectors, + data_ref.len() + )); + } + + let body = data.slice(ALP_HEADER_SIZE..); + let body_ref = body.as_ref(); + let body_len = body_ref.len(); + let offsets_section_size = num_vectors * std::mem::size_of::(); + + let mut offsets = Vec::with_capacity(num_vectors); + for i in 0..num_vectors { + let start = ALP_HEADER_SIZE + i * 4; + let offset = u32::from_le_bytes([ + data_ref[start], + data_ref[start + 1], + data_ref[start + 2], + data_ref[start + 3], + ]); + + if offset as usize >= body_len { + return Err(general_err!( + "Invalid ALP page: vector offset {} out of bounds for body length {}", + offset, + body_len + )); + } + + if (offset as usize) < offsets_section_size { + return Err(general_err!( + "Invalid ALP page: vector offset {} points into offsets section {}", + offset, + offsets_section_size + )); + } + + offsets.push(offset); + } + + let mut vectors = Vec::with_capacity(num_vectors); + let mut expected_next_offset = offsets_section_size; + for (vector_idx, vector_offset) in offsets.iter().enumerate() { + let vector_start = *vector_offset as usize; + if vector_start != expected_next_offset { + return Err(general_err!( + "Invalid ALP page: vector offset {} at index {} does not match expected {}", + vector_start, + vector_idx, + expected_next_offset + )); + } + + let vector_end = if vector_idx + 1 < offsets.len() { + offsets[vector_idx + 1] as usize + } else { + body_len + }; + + if vector_end < vector_start { + return Err(general_err!( + "Invalid ALP page: vector offsets are not monotonic at index {}", + vector_idx + )); + } + + let vector_num_elements = header.vector_num_elements(vector_idx); + let vector = + parse_vector_view::(body_ref, vector_start, vector_end, vector_num_elements)?; + expected_next_offset = vector_start + .checked_add(vector.expected_stored_size()) + .ok_or_else(|| { + general_err!("Invalid ALP page: expected next vector offset overflow") + })?; + vectors.push(vector); + } + + if expected_next_offset != body_len { + return Err(general_err!( + "Invalid ALP page: body size {} does not match expected {} (offsets + vectors)", + body_len, + expected_next_offset + )); + } + + Ok(AlpPageLayout { + header, + body, + vectors, + }) +} + +/// Parse a single vector section: +/// `[AlpInfo][ForInfo][PackedValues][ExceptionPositions][ExceptionValues]`. +fn parse_vector_view( + body: &[u8], + vector_start: usize, + vector_end: usize, + num_elements: u16, +) -> Result> { + let vector_bytes = &body[vector_start..vector_end]; + + let metadata_size = + AlpEncodedVectorInfo::STORED_SIZE + AlpEncodedForVectorInfo::::stored_size(); + if vector_bytes.len() < metadata_size { + return Err(general_err!( + "Invalid ALP page: vector metadata too short, expected at least {} bytes, got {}", + metadata_size, + vector_bytes.len() + )); + } + + let alp_info = AlpEncodedVectorInfo { + exponent: vector_bytes[0], + factor: vector_bytes[1], + num_exceptions: u16::from_le_bytes([vector_bytes[2], vector_bytes[3]]), + }; + + let max_exponent = if Exact::WIDTH == 4 { + ALP_MAX_EXPONENT_F32 + } else { + ALP_MAX_EXPONENT_F64 + }; + + if alp_info.exponent > max_exponent { + return Err(general_err!( + "Invalid ALP page: exponent {} exceeds max {}", + alp_info.exponent, + max_exponent + )); + } + + if alp_info.factor > alp_info.exponent { + return Err(general_err!( + "Invalid ALP page: factor {} exceeds exponent {}", + alp_info.factor, + alp_info.exponent + )); + } + + if alp_info.num_exceptions > num_elements { + return Err(general_err!( + "Invalid ALP page: num_exceptions {} exceeds vector num_elements {}", + alp_info.num_exceptions, + num_elements + )); + } + + let for_start = AlpEncodedVectorInfo::STORED_SIZE; + let for_end = for_start + Exact::WIDTH; + let frame_of_reference = Exact::from_le_slice(&vector_bytes[for_start..for_end]); + let bit_width = vector_bytes[for_end]; + + if bit_width as usize > Exact::WIDTH * 8 { + return Err(general_err!( + "Invalid ALP page: bit width {} exceeds {}", + bit_width, + Exact::WIDTH * 8 + )); + } + + let for_info = AlpEncodedForVectorInfo:: { + frame_of_reference, + bit_width, + }; + + let data_size = for_info.get_data_stored_size(num_elements, alp_info.num_exceptions); + let expected_size = metadata_size + data_size; + if vector_bytes.len() < expected_size { + return Err(general_err!( + "Invalid ALP page: vector data too short, expected at least {} bytes, got {}", + expected_size, + vector_bytes.len() + )); + } + if vector_bytes.len() > expected_size { + return Err(general_err!( + "Invalid ALP page: vector data too long, expected {} bytes, got {}", + expected_size, + vector_bytes.len() + )); + } + + let data = &vector_bytes[metadata_size..expected_size]; + let packed_size = for_info.get_bit_packed_size(num_elements); + let positions_size = alp_info.num_exceptions as usize * std::mem::size_of::(); + let values_size = alp_info.num_exceptions as usize * Exact::WIDTH; + + let packed_start = 0; + let packed_end = packed_start + packed_size; + let positions_start = packed_end; + let positions_end = positions_start + positions_size; + let values_start = positions_end; + let values_end = values_start + values_size; + + let mut exception_positions = Vec::with_capacity(alp_info.num_exceptions as usize); + for chunk in data[positions_start..positions_end].chunks_exact(2) { + let position = u16::from_le_bytes([chunk[0], chunk[1]]); + if position >= num_elements { + return Err(general_err!( + "Invalid ALP page: exception position {} out of bounds for vector length {}", + position, + num_elements + )); + } + exception_positions.push(position); + } + + let packed_values = + (vector_start + metadata_size + packed_start)..(vector_start + metadata_size + packed_end); + + let mut exception_values = Vec::with_capacity(alp_info.num_exceptions as usize); + for chunk in data[values_start..values_end].chunks_exact(Exact::WIDTH) { + exception_values.push(Exact::from_le_slice(chunk)); + } + + Ok(AlpEncodedVectorView { + num_elements, + alp_info, + for_info, + packed_values, + exception_positions, + exception_values, + }) +} + +/// Decode bit-packed deltas into exact integers. +fn bit_unpack_integers( + packed_values: Bytes, + bit_width: u8, + num_elements: u16, +) -> Result> { + if bit_width as usize > Exact::WIDTH * 8 { + return Err(general_err!( + "Invalid ALP page: bit width {} exceeds {}", + bit_width, + Exact::WIDTH * 8 + )); + } + + if bit_width == 0 { + return Ok(vec![Exact::zero(); num_elements as usize]); + } + + let mut out = vec![Exact::zero(); num_elements as usize]; + let mut reader = BitReader::new(packed_values); + let read = reader.get_batch::(&mut out, bit_width as usize); + if read != out.len() { + return Err(general_err!( + "Invalid ALP page: bit unpack read {} values, expected {}", + read, + out.len() + )); + } + + Ok(out) +} + +/// Apply inverse FOR: `decoded = delta + frame_of_reference`. +fn inverse_for(deltas: &mut [Exact], frame_of_reference: Exact) { + for value in deltas { + *value = value.wrapping_add(frame_of_reference); + } +} + +/// Decode one vector into output floating values: +/// bit-unpack -> inverse FOR -> decimal decode -> patch exceptions. +fn decode_vector_values( + body: &Bytes, + vector: &AlpEncodedVectorView, +) -> Result> { + let mut exact_values = bit_unpack_integers( + body.slice(vector.packed_values.clone()), + vector.for_info.bit_width, + vector.num_elements, + )?; + inverse_for(&mut exact_values, vector.for_info.frame_of_reference); + + let scale = Value::decode_scale(vector.alp_info.exponent, vector.alp_info.factor); + + let mut out = Vec::with_capacity(vector.num_elements as usize); + for exact_value in exact_values { + let signed_value = exact_value.reinterpret_as_signed(); + out.push(Value::decode_value(signed_value, scale)); + } + + if vector.exception_positions.len() != vector.exception_values.len() { + return Err(general_err!( + "Invalid ALP page: exception positions ({}) and values ({}) length mismatch", + vector.exception_positions.len(), + vector.exception_values.len() + )); + } + + for (pos, value_bits) in vector + .exception_positions + .iter() + .zip(vector.exception_values.iter()) + { + let pos = *pos as usize; + if pos >= out.len() { + return Err(general_err!( + "Invalid ALP page: exception position {} out of bounds for vector length {}", + pos, + out.len() + )); + } + out[pos] = Value::from_exact_bits(*value_bits); + } + + Ok(out) +} + +fn decode_page_values(layout: &AlpPageLayout) -> Result> { + let total = layout.header.num_elements_usize(); + let mut out = vec![Value::default(); total]; + decode_page_values_into::(layout, &mut out)?; + Ok(out) +} + +/// Decode a full ALP page into an existing output slice. +/// +/// This walks pre-parsed vector views from `layout` and decodes into `out`. +fn decode_page_values_into( + layout: &AlpPageLayout, + out: &mut [Value], +) -> Result<()> { + let total = layout.header.num_elements_usize(); + if out.len() < total { + return Err(general_err!( + "Invalid ALP decode output: output length {} smaller than page values {}", + out.len(), + total + )); + } + + let mut output_offset = 0usize; + for vector in &layout.vectors { + let vector_values = decode_vector_values::(&layout.body, vector)?; + let next_offset = output_offset + vector_values.len(); + out[output_offset..next_offset].copy_from_slice(&vector_values); + output_offset = next_offset; + } + if output_offset != total { + return Err(general_err!( + "Invalid ALP decode output: decoded {} values, expected {}", + output_offset, + total + )); + } + Ok(()) +} + +/// Decoder for ALP-encoded floating-point pages (`f32`/`f64`). +/// +/// Current behavior: +/// - `set_data` parses + validates page metadata and stores ALP layout state. +/// - `get` uses a fast path to decode directly to output when all values are requested. +/// - otherwise, `get` lazily decodes the full page once, then copies from decoded buffer. +/// - `skip` advances the decoded cursor. +pub(crate) struct AlpDecoder +where + T::T: AlpFloat, + ::Exact: Send, +{ + layout: Option::Exact>>, + decoded_values: Vec, + current_offset: usize, + needs_decode: bool, + num_values: usize, + _marker: PhantomData, +} + +impl AlpDecoder +where + T::T: AlpFloat, + ::Exact: Send, +{ + pub(crate) fn new() -> Self { + Self { + layout: None, + decoded_values: Vec::new(), + current_offset: 0, + needs_decode: false, + num_values: 0, + _marker: PhantomData, + } + } + + /// Decode the stored page into `decoded_values` if it hasn't been decoded yet. + /// + /// Used by partial `get` / `skip` paths that need a stable decoded buffer. + fn ensure_decoded(&mut self) -> Result<()> { + if !self.needs_decode { + return Ok(()); + } + + let layout = self.layout.take().ok_or_else(|| { + general_err!("Invalid ALP decoder state: set_data must be called before get/skip") + })?; + + self.decoded_values = decode_page_values::(&layout)?; + self.needs_decode = false; + + Ok(()) + } +} + +impl Decoder for AlpDecoder +where + T::T: AlpFloat, + ::Exact: Send, +{ + /// Store validated page layout and reset read cursor state. + /// + /// Actual value decoding is deferred until first `get`/`skip`. + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { + let layout = parse_alp_page_layout::<::Exact>(data)?; + + if layout.header.num_elements_usize() != num_values { + return Err(general_err!( + "Invalid ALP page: header num_elements {} does not match page num_values {}", + layout.header.num_elements, + num_values + )); + } + + self.layout = Some(layout); + self.decoded_values.clear(); + self.current_offset = 0; + self.needs_decode = num_values > 0; + self.num_values = num_values; + Ok(()) + } + + /// Read up to `buffer.len()` decoded values. + /// + /// Fast path: if caller requests all remaining values from a fresh page, + /// decode directly into `buffer` (matching C++ ALP decoder behavior). + /// Otherwise decode once into internal storage and copy slices from there. + fn get(&mut self, buffer: &mut [T::T]) -> Result { + let target = buffer.len().min(self.num_values); + if target == 0 { + return Ok(0); + } + + // C++ parity fast path: decode directly into caller output when it asks + // for all remaining values from a fresh page. + if self.needs_decode && target == self.num_values { + let layout = self.layout.take().ok_or_else(|| { + general_err!("Invalid ALP decoder state: set_data must be called before get/skip") + })?; + decode_page_values_into::(&layout, &mut buffer[..target])?; + self.needs_decode = false; + self.decoded_values.clear(); + self.current_offset = 0; + self.num_values = 0; + return Ok(target); + } + + self.ensure_decoded()?; + let end = self.current_offset + target; + buffer[..target].copy_from_slice(&self.decoded_values[self.current_offset..end]); + self.current_offset = end; + self.num_values -= target; + Ok(target) + } + + fn values_left(&self) -> usize { + self.num_values + } + + fn encoding(&self) -> Encoding { + Encoding::ALP + } + + /// Skip up to `num_values` decoded values. + /// + /// For parity with C++ partial-read behavior, this may trigger deferred + /// page decoding before advancing the cursor. + fn skip(&mut self, num_values: usize) -> Result { + let to_skip = num_values.min(self.num_values); + if to_skip == 0 { + return Ok(0); + } + + self.ensure_decoded()?; + self.current_offset += to_skip; + self.num_values -= to_skip; + Ok(to_skip) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data_type::FloatType; + + fn make_alp_page_bytes( + compression_mode: u8, + integer_encoding: u8, + log_vector_size: u8, + num_elements: i32, + offsets: &[u32], + body_tail_len: usize, + ) -> Vec { + let mut out = Vec::with_capacity(ALP_HEADER_SIZE + offsets.len() * 4 + body_tail_len); + out.push(compression_mode); + out.push(integer_encoding); + out.push(log_vector_size); + out.extend_from_slice(&num_elements.to_le_bytes()); + for offset in offsets { + out.extend_from_slice(&offset.to_le_bytes()); + } + out.extend(std::iter::repeat_n(0u8, body_tail_len)); + out + } + + trait AppendLeBytes { + fn append_le_bytes(self, out: &mut Vec); + } + + impl AppendLeBytes for u32 { + fn append_le_bytes(self, out: &mut Vec) { + out.extend_from_slice(&self.to_le_bytes()); + } + } + + impl AppendLeBytes for u64 { + fn append_le_bytes(self, out: &mut Vec) { + out.extend_from_slice(&self.to_le_bytes()); + } + } + + struct VectorSpec<'a, Exact> { + exponent: u8, + factor: u8, + frame_of_reference: Exact, + bit_width: u8, + packed_values: &'a [u8], + exception_positions: &'a [u16], + exception_values: &'a [Exact], + } + + fn make_vector(spec: VectorSpec<'_, Exact>) -> Vec { + let num_exceptions = spec.exception_positions.len(); + assert_eq!(num_exceptions, spec.exception_values.len()); + assert!(u16::try_from(num_exceptions).is_ok()); + + let mut out = Vec::new(); + out.push(spec.exponent); + out.push(spec.factor); + out.extend_from_slice(&(num_exceptions as u16).to_le_bytes()); + spec.frame_of_reference.append_le_bytes(&mut out); + out.push(spec.bit_width); + out.extend_from_slice(spec.packed_values); + for position in spec.exception_positions { + out.extend_from_slice(&position.to_le_bytes()); + } + for value in spec.exception_values { + value.append_le_bytes(&mut out); + } + out + } + + fn make_page_from_vectors( + log_vector_size: u8, + num_elements: i32, + vectors: &[Vec], + ) -> Vec { + let vector_size = 1usize << log_vector_size; + let expected_num_vectors = if num_elements <= 0 { + 0 + } else { + (num_elements as usize).div_ceil(vector_size) + }; + assert_eq!(vectors.len(), expected_num_vectors); + + let offsets_section_size = vectors.len() * std::mem::size_of::(); + let mut offsets = Vec::with_capacity(vectors.len()); + let mut running_offset = offsets_section_size as u32; + for vector in vectors { + offsets.push(running_offset); + running_offset += vector.len() as u32; + } + + let mut page = make_alp_page_bytes(0, 0, log_vector_size, num_elements, &offsets, 0); + for vector in vectors { + page.extend_from_slice(vector); + } + page + } + + #[test] + fn test_parse_alp_page_layout_valid() { + let data = make_alp_page_bytes(0, 0, 3, 4, &[4], 13); + let parsed = parse_alp_page_layout::(Bytes::from(data)).unwrap(); + assert_eq!(parsed.header.num_elements, 4); + assert_eq!(parsed.vectors.len(), 1); + assert_eq!(parsed.vectors[0].num_elements, 4); + } + + #[test] + fn test_parse_alp_page_layout_short_header() { + let err = parse_alp_page_layout::(Bytes::from_static(&[0, 1, 2])).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: expected at least 7 bytes for header") + ); + } + + #[test] + fn test_parse_alp_page_layout_too_small_log_vector_size() { + let data = make_alp_page_bytes(0, 0, 2, 1, &[4], 8); + let err = parse_alp_page_layout::(Bytes::from(data)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: log_vector_size 2 below min 3") + ); + } + + #[test] + fn test_parse_alp_page_layout_too_big_log_vector_size() { + let data = make_alp_page_bytes(0, 0, 16, 1, &[4], 8); + let err = parse_alp_page_layout::(Bytes::from(data)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: log_vector_size 16 exceeds max 15") + ); + } + + #[test] + fn test_parse_alp_page_layout_invalid_integer_encoding() { + let data = make_alp_page_bytes(0, 1, 3, 1, &[4], 8); + let err = parse_alp_page_layout::(Bytes::from(data)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: unsupported integer encoding 1") + ); + } + + #[test] + fn test_parse_alp_page_layout_negative_num_elements() { + let data = make_alp_page_bytes(0, 0, 3, -1, &[4], 8); + let err = parse_alp_page_layout::(Bytes::from(data)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: num_elements -1 must be >= 0") + ); + } + + #[test] + fn test_parse_alp_page_layout_invalid_exponent_f32() { + let vector = make_vector(VectorSpec { + exponent: 11, + factor: 0, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: exponent 11 exceeds max 10") + ); + } + + #[test] + fn test_parse_alp_page_layout_invalid_factor_f32() { + let vector = make_vector(VectorSpec { + exponent: 0, + factor: 11, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: factor 11 exceeds exponent 0") + ); + } + + #[test] + fn test_parse_alp_page_layout_factor_exceeds_exponent() { + let vector = make_vector(VectorSpec { + exponent: 2, + factor: 3, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: factor 3 exceeds exponent 2") + ); + } + + #[test] + fn test_parse_alp_page_layout_invalid_num_exceptions() { + let vector = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[0, 0], + exception_values: &[0, 0], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: num_exceptions 2 exceeds vector num_elements 1") + ); + } + + #[test] + fn test_parse_alp_page_layout_invalid_exception_position() { + let vector = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[1], + exception_values: &[123], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string().contains( + "Invalid ALP page: exception position 1 out of bounds for vector length 1" + ) + ); + } + + #[test] + fn test_parse_alp_page_layout_non_monotonic_offsets() { + let data = make_alp_page_bytes(0, 0, 3, 9, &[12, 8], 12); + let err = parse_alp_page_layout::(Bytes::from(data)).unwrap_err(); + assert!( + err.to_string().contains( + "Invalid ALP page: vector offset 12 at index 0 does not match expected 8" + ) + ); + } + + #[test] + fn test_parse_alp_page_layout_truncated_vector_data() { + let mut vector = Vec::new(); + vector.push(0); + vector.push(0); + vector.extend_from_slice(&0u16.to_le_bytes()); + vector.extend_from_slice(&10u32.to_le_bytes()); + vector.push(1); + let page = make_page_from_vectors(3, 2, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: vector data too short") + ); + } + + #[test] + fn test_parse_alp_page_layout_vector_data_too_long() { + let mut vector = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + vector.push(0xAB); + + let page = make_page_from_vectors(3, 1, &[vector]); + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: vector data too long, expected 9 bytes, got 10") + ); + } + + #[test] + fn test_parse_alp_page_layout_body_size_mismatch_unclaimed_bytes() { + let vector = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + + // offsets section is 4 bytes for one vector, so offset=5 leaves one + // unclaimed byte between offsets and vector data. + let mut page = make_alp_page_bytes(0, 0, 3, 1, &[5], 0); + page.push(0); + page.extend_from_slice(&vector); + + let err = parse_alp_page_layout::(Bytes::from(page)).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid ALP page: vector offset 5 at index 0 does not match expected 4") + ); + } + + #[test] + fn test_parse_alp_page_layout_parses_vector_view_data_only_f64() { + let mut vector = Vec::new(); + + vector.push(2); + vector.push(0); + vector.extend_from_slice(&1u16.to_le_bytes()); + + vector.extend_from_slice(&10u64.to_le_bytes()); + vector.push(0); + + vector.extend_from_slice(&0u16.to_le_bytes()); + vector.extend_from_slice(&42.5_f64.to_le_bytes()); + + let offsets = [4u32]; + let mut page = make_alp_page_bytes(0, 0, 3, 1, &offsets, 0); + page.extend_from_slice(&vector); + + let parsed = parse_alp_page_layout::(Bytes::from(page)).unwrap(); + assert_eq!(parsed.vectors.len(), 1); + let parsed_vector = &parsed.vectors[0]; + assert_eq!(parsed_vector.num_elements, 1); + assert_eq!(parsed_vector.alp_info.num_exceptions, 1); + assert_eq!(parsed_vector.for_info.bit_width, 0); + assert_eq!(parsed_vector.exception_positions, vec![0]); + assert_eq!(parsed_vector.exception_values, vec![42.5_f64.to_bits()]); + } + + #[test] + fn test_bit_unpack_integers_width_zero() { + let unpacked = bit_unpack_integers::(Bytes::new(), 0, 3).unwrap(); + assert_eq!(unpacked, vec![0, 0, 0]); + } + + #[test] + fn test_bit_unpack_integers_width_two() { + let unpacked = + bit_unpack_integers::(Bytes::from_static(&[0b0010_0111]), 2, 3).unwrap(); + assert_eq!(unpacked, vec![3, 1, 2]); + } + + #[test] + fn test_inverse_for() { + let mut decoded = vec![0u32, 3, 2]; + inverse_for(&mut decoded, 10); + assert_eq!(decoded, vec![10, 13, 12]); + } + + #[test] + fn test_decode_page_values_f32_no_exceptions() { + let vector = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 10u32, + bit_width: 2, + packed_values: &[0b1110_0100], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 4, &[vector]); + let layout = parse_alp_page_layout::(Bytes::from(page)).unwrap(); + let decoded = decode_page_values::(&layout).unwrap(); + assert_eq!(decoded, vec![10.0, 11.0, 12.0, 13.0]); + } + + #[test] + fn test_decode_page_values_f64_multi_vector_with_exceptions() { + let vector0 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 10u64, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[1], + exception_values: &[42.5f64.to_bits()], + }); + let vector1 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 7u64, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 9, &[vector0, vector1]); + let layout = parse_alp_page_layout::(Bytes::from(page)).unwrap(); + let decoded = decode_page_values::(&layout).unwrap(); + assert_eq!( + decoded, + vec![10.0, 42.5, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 7.0] + ); + } + + #[test] + fn test_decode_page_values_f32_edge_values_via_exceptions() { + let edge_values = [ + f32::NAN.to_bits(), + (-0.0f32).to_bits(), + f32::INFINITY.to_bits(), + ]; + let vector = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 0u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[0, 1, 2], + exception_values: &edge_values, + }); + let page = make_page_from_vectors(3, 3, &[vector]); + let layout = parse_alp_page_layout::(Bytes::from(page)).unwrap(); + let decoded = decode_page_values::(&layout).unwrap(); + + assert!(decoded[0].is_nan()); + assert_eq!(decoded[1], -0.0); + assert!(decoded[1].is_sign_negative()); + assert_eq!(decoded[2], f32::INFINITY); + } + + #[test] + fn test_decode_page_values_f32_two_step_decimal_multiply() { + let encoded = 1_970_570_984_i32; + let vector = make_vector(VectorSpec { + exponent: 1, + factor: 1, + frame_of_reference: encoded as u32, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let layout = parse_alp_page_layout::(Bytes::from(page)).unwrap(); + let decoded = decode_page_values::(&layout).unwrap(); + + let expected_two_step = ((encoded as f32) * ALP_POW10_F32[1]) * ALP_NEG_POW10_F32[1]; + let one_step_scale = ALP_POW10_F32[1] * ALP_NEG_POW10_F32[1]; + let expected_one_step = (encoded as f32) * one_step_scale; + + assert_eq!(decoded[0].to_bits(), expected_two_step.to_bits()); + assert_ne!(decoded[0].to_bits(), expected_one_step.to_bits()); + } + + #[test] + fn test_decode_page_values_f64_two_step_decimal_multiply() { + let encoded = -3_900_047_474_048_127_703_i64; + let vector = make_vector(VectorSpec { + exponent: 1, + factor: 1, + frame_of_reference: encoded as u64, + bit_width: 0, + packed_values: &[], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 1, &[vector]); + let layout = parse_alp_page_layout::(Bytes::from(page)).unwrap(); + let decoded = decode_page_values::(&layout).unwrap(); + + let expected_two_step = ((encoded as f64) * ALP_POW10_F64[1]) * ALP_NEG_POW10_F64[1]; + let one_step_scale = ALP_POW10_F64[1] * ALP_NEG_POW10_F64[1]; + let expected_one_step = (encoded as f64) * one_step_scale; + + assert_eq!(decoded[0].to_bits(), expected_two_step.to_bits()); + assert_ne!(decoded[0].to_bits(), expected_one_step.to_bits()); + } + + #[test] + fn test_alp_decoder_get_across_vectors() { + let vector0 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 10u32, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[], + exception_values: &[], + }); + let vector1 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 20u32, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 12, &[vector0, vector1]); + + let mut decoder = AlpDecoder::::new(); + decoder.set_data(Bytes::from(page), 12).unwrap(); + + let mut first = [0.0f32; 9]; + let read = decoder.get(&mut first).unwrap(); + assert_eq!(read, 9); + assert_eq!( + first, + [10.0, 11.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 20.0] + ); + assert_eq!(decoder.values_left(), 3); + + let mut second = [0.0f32; 3]; + let read = decoder.get(&mut second).unwrap(); + assert_eq!(read, 3); + assert_eq!(second, [21.0, 20.0, 20.0]); + assert_eq!(decoder.values_left(), 0); + } + + #[test] + fn test_alp_decoder_skip_across_vectors() { + let vector0 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 10u32, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[], + exception_values: &[], + }); + let vector1 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 20u32, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 12, &[vector0, vector1]); + + let mut decoder = AlpDecoder::::new(); + decoder.set_data(Bytes::from(page), 12).unwrap(); + + let skipped = decoder.skip(9).unwrap(); + assert_eq!(skipped, 9); + assert_eq!(decoder.values_left(), 3); + + let mut out = [0.0f32; 1]; + let read = decoder.get(&mut out).unwrap(); + assert_eq!(read, 1); + assert_eq!(out[0], 21.0); + assert_eq!(decoder.values_left(), 2); + } + + #[test] + fn test_alp_decoder_get_fast_path_full_read() { + let vector0 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 10u32, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[], + exception_values: &[], + }); + let vector1 = make_vector(VectorSpec { + exponent: 0, + factor: 0, + frame_of_reference: 20u32, + bit_width: 1, + packed_values: &[0b0000_0010], + exception_positions: &[], + exception_values: &[], + }); + let page = make_page_from_vectors(3, 12, &[vector0, vector1]); + + let mut decoder = AlpDecoder::::new(); + decoder.set_data(Bytes::from(page), 12).unwrap(); + + let mut out = [0.0f32; 12]; + let read = decoder.get(&mut out).unwrap(); + assert_eq!(read, 12); + assert_eq!( + out, + [ + 10.0, 11.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 20.0, 21.0, 20.0, 20.0 + ] + ); + assert_eq!(decoder.values_left(), 0); + assert!(!decoder.needs_decode); + assert!(decoder.decoded_values.is_empty()); + assert!(decoder.layout.is_none()); + } +} diff --git a/parquet/src/format.rs b/parquet/src/format.rs index 101799d00350..402f20ddf488 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -450,6 +450,8 @@ impl Encoding { /// Added in 2.8 for FLOAT and DOUBLE. /// Support for INT32, INT64 and FIXED_LEN_BYTE_ARRAY added in 2.11. pub const BYTE_STREAM_SPLIT: Encoding = Encoding(9); + /// Adaptive Lossless floating-Point encoding (ALP) for FLOAT and DOUBLE. + pub const ALP: Encoding = Encoding(10); pub const ENUM_VALUES: &'static [Self] = &[ Self::PLAIN, Self::PLAIN_DICTIONARY, @@ -460,6 +462,7 @@ impl Encoding { Self::DELTA_BYTE_ARRAY, Self::RLE_DICTIONARY, Self::BYTE_STREAM_SPLIT, + Self::ALP, ]; } @@ -486,6 +489,7 @@ impl From for Encoding { 7 => Encoding::DELTA_BYTE_ARRAY, 8 => Encoding::RLE_DICTIONARY, 9 => Encoding::BYTE_STREAM_SPLIT, + 10 => Encoding::ALP, _ => Encoding(i) } } @@ -6041,4 +6045,3 @@ impl crate::thrift::TSerializable for FileCryptoMetaData { o_prot.write_struct_end() } } - diff --git a/parquet/tests/arrow_reader/alp.rs b/parquet/tests/arrow_reader/alp.rs new file mode 100644 index 000000000000..93f36c1aaffb --- /dev/null +++ b/parquet/tests/arrow_reader/alp.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::compute::concat_batches; +use arrow::util::test_util::parquet_test_data; +use arrow_array::cast::as_primitive_array; +use arrow_array::types::Float32Type; +use arrow_array::{Array, ArrayRef, Float32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::PathBuf; +use std::sync::Arc; + +#[test] +fn test_read_f32_alp() { + let data_dir = PathBuf::from(parquet_test_data()); + let parquet_path = data_dir.join("alp_float_arade.parquet"); + let expected_csv_path = data_dir.join("alp_arade_expect.csv"); + if !parquet_path.exists() || !expected_csv_path.exists() { + eprintln!("Skipping ALP test files not found"); + return; + } + + let expected = read_expected_csv_batch(&expected_csv_path); + let actual = read_parquet_batch(&parquet_path); + + assert_eq!(actual.schema(), expected.schema(), "schema mismatch"); + assert_eq!( + actual.num_columns(), + expected.num_columns(), + "column mismatch" + ); + assert_eq!(actual.num_rows(), expected.num_rows(), "row count mismatch"); + + for col_idx in 0..actual.num_columns() { + let col_name = actual.schema().field(col_idx).name().clone(); + let actual_col = as_primitive_array::(actual.column(col_idx).as_ref()); + let expected_col = as_primitive_array::(expected.column(col_idx).as_ref()); + + for row_idx in 0..actual.num_rows() { + assert_eq!( + actual_col.is_valid(row_idx), + expected_col.is_valid(row_idx), + "null mismatch at column {col_name} row {row_idx}" + ); + if actual_col.is_valid(row_idx) { + let actual_value = actual_col.value(row_idx); + let expected_value = expected_col.value(row_idx); + assert!( + actual_value.to_bits() == expected_value.to_bits(), + "bit mismatch at column {col_name} row {row_idx}: expected={expected_value} actual={actual_value}" + ); + } + } + } +} + +fn alp_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("value1", DataType::Float32, true), + Field::new("value2", DataType::Float32, true), + Field::new("value3", DataType::Float32, true), + Field::new("value4", DataType::Float32, true), + ])) +} + +fn read_parquet_batch(path: &PathBuf) -> RecordBatch { + let file = File::open(path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap(); + let mut batches = Vec::new(); + for batch in reader { + batches.push(batch.unwrap()); + } + assert!(!batches.is_empty(), "expected non-empty parquet batch set"); + concat_batches(batches[0].schema_ref(), &batches).unwrap() +} + +fn read_expected_csv_batch(path: &PathBuf) -> RecordBatch { + let file = File::open(path).unwrap(); + let mut lines = BufReader::new(file).lines(); + + let header = lines.next().expect("expected csv header line").unwrap(); + assert_eq!( + header.trim(), + "value1,value2,value3,value4", + "unexpected csv header" + ); + + let mut c0 = Vec::new(); + let mut c1 = Vec::new(); + let mut c2 = Vec::new(); + let mut c3 = Vec::new(); + for (line_idx, line) in lines.enumerate() { + let line = line.unwrap(); + if line.trim().is_empty() { + continue; + } + let values: Vec<_> = line.split(',').map(str::trim).collect(); + assert_eq!( + values.len(), + 4, + "wrong csv column count at line {}", + line_idx + 2 + ); + c0.push(values[0].parse::().unwrap()); + c1.push(values[1].parse::().unwrap()); + c2.push(values[2].parse::().unwrap()); + c3.push(values[3].parse::().unwrap()); + } + + RecordBatch::try_new( + alp_schema(), + vec![ + Arc::new(Float32Array::from(c0)) as ArrayRef, + Arc::new(Float32Array::from(c1)) as ArrayRef, + Arc::new(Float32Array::from(c2)) as ArrayRef, + Arc::new(Float32Array::from(c3)) as ArrayRef, + ], + ) + .unwrap() +} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index d9729fdc7703..79148c77389f 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -38,6 +38,7 @@ use parquet::file::properties::{ use std::sync::Arc; use tempfile::NamedTempFile; +mod alp; mod bad_data; mod bloom_filter; #[cfg(feature = "crc")]