Skip to content
Closed
3 changes: 3 additions & 0 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,9 @@ impl ByteArrayDecoderDictionary {
return Ok(0);
}

// Pre-reserve offsets capacity to avoid per-chunk reallocation
output.offsets.reserve(len);

self.decoder.read(len, |keys| {
output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
})
Expand Down
102 changes: 38 additions & 64 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::utf8::check_valid_utf8;
use arrow_array::{ArrayRef, builder::make_view};
use arrow_buffer::Buffer;
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
Expand Down Expand Up @@ -475,74 +474,49 @@ impl ByteViewArrayDecoderDictionary {
return Ok(0);
}

// Check if the last few buffer of `output`` are the same as the `dict` buffer
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};
// Check if all dictionary views are inlined (len <= 12).
// If so, we can skip buffer management entirely since inline views
// don't reference any buffers.
let all_views_inlined = dict.views.iter().all(|&v| (v as u32) <= 12);

if !all_views_inlined {
// Check if the last few buffer of `output`` are the same as the `dict` buffer
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};

if need_to_create_new_buffer {
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
if need_to_create_new_buffer {
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
}
}
}

// Calculate the offset of the dictionary buffers in the output buffers
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;

let mut error = None;
let read = self.decoder.read(len, |keys| {
if base_buffer_idx == 0 {
// the dictionary buffers are the last buffers in output, we can directly use the views
output
.views
.extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
Some(&view) => view,
None => {
if error.is_none() {
error = Some(general_err!("invalid key={} for dictionary", *k));
}
0
}
}));
Ok(())
} else {
output
.views
.extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
Some(&view) => {
let len = view as u32;
if len <= 12 {
view
} else {
let mut view = ByteView::from(view);
view.buffer_index += base_buffer_idx;
view.into()
}
}
None => {
if error.is_none() {
error = Some(general_err!("invalid key={} for dictionary", *k));
}
0
}
}));
Ok(())
}
})?;
if let Some(e) = error {
return Err(e);
}
let base_buffer_idx = if all_views_inlined {
0
} else {
output.buffers.len() as u32 - dict.buffers.len() as u32
};

// Pre-reserve output capacity to avoid per-chunk reallocation in extend
output.views.reserve(len);

let read = self.decoder.read_gather_views(
len,
&dict.views,
&mut output.views,
base_buffer_idx,
)?;
Ok(read)
}

Expand Down
86 changes: 82 additions & 4 deletions parquet/src/arrow/decoder/dictionary_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use bytes::Bytes;

use crate::encodings::rle::RleDecoder;
use crate::encodings::rle::{RleDecodedBatch, RleDecoder};
use crate::errors::Result;

/// Decoder for `Encoding::RLE_DICTIONARY` indices
Expand All @@ -27,7 +27,7 @@ pub struct DictIndexDecoder {

/// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
/// offsets
index_buf: Box<[i32; 1024]>,
index_buf: Vec<i32>,
/// Current length of `index_buf`
index_buf_len: usize,
/// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
Expand All @@ -46,13 +46,14 @@ impl DictIndexDecoder {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.slice(1..))?;
let max_remaining = num_values.unwrap_or(num_levels);

Ok(Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf: vec![0; max_remaining.min(1024)],
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
max_remaining_values: max_remaining,
})
}

Expand Down Expand Up @@ -91,6 +92,83 @@ impl DictIndexDecoder {
Ok(values_read)
}

/// Decode indices and gather views directly into `output`.
///
/// For RLE runs, fills output with the repeated view directly (no index buffer needed).
/// For bit-packed runs, decodes indices to a stack-local buffer and gathers immediately.
pub fn read_gather_views(
&mut self,
len: usize,
dict_views: &[u128],
output: &mut Vec<u128>,
base_buffer_idx: u32,
) -> Result<usize> {
let to_read = len.min(self.max_remaining_values);
let mut values_read = 0;

// Flush any buffered indices from previous reads
if self.index_offset < self.index_buf_len {
let n = (self.index_buf_len - self.index_offset).min(to_read);
let keys = &self.index_buf[self.index_offset..self.index_offset + n];
Self::gather_views(keys, dict_views, output, base_buffer_idx);
self.index_offset += n;
self.max_remaining_values -= n;
values_read += n;
}

if values_read < to_read {
let read = self.decoder.get_batch_direct(to_read - values_read, |batch| {
match batch {
RleDecodedBatch::Rle { index, count } => {
let view = dict_views[index as usize];
let view = if base_buffer_idx == 0 || (view as u32) <= 12 {
view
} else {
let mut bv = arrow_data::ByteView::from(view);
bv.buffer_index += base_buffer_idx;
bv.into()
};
output.extend(std::iter::repeat_n(view, count));
}
RleDecodedBatch::BitPacked(keys) => {
Self::gather_views(keys, dict_views, output, base_buffer_idx);
}
}
})?;
self.max_remaining_values -= read;
values_read += read;
}

Ok(values_read)
}

fn gather_views(
keys: &[i32],
dict_views: &[u128],
output: &mut Vec<u128>,
base_buffer_idx: u32,
) {
// Clamp index to valid range to prevent UB on corrupt data.
// This is branchless (cmp+csel on ARM) and avoids bounds checks in the hot loop.
let max_idx = dict_views.len() - 1;
if base_buffer_idx == 0 {
output.extend(keys.iter().map(|k| unsafe {
*dict_views.get_unchecked((*k as usize).min(max_idx))
}));
} else {
output.extend(keys.iter().map(|k| {
let view = unsafe { *dict_views.get_unchecked((*k as usize).min(max_idx)) };
if (view as u32) <= 12 {
view
} else {
let mut bv = arrow_data::ByteView::from(view);
bv.buffer_index += base_buffer_idx;
bv.into()
}
}));
}
}

/// Skip up to `to_skip` values, returning the number of values skipped
pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
Expand Down
23 changes: 18 additions & 5 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod snappy_codec {
use snap::raw::{Decoder, Encoder, decompress_len, max_compress_len};

use crate::compression::Codec;
use crate::errors::Result;
use crate::errors::{ParquetError, Result};

/// Codec for Snappy compression format.
pub struct SnappyCodec {
Expand Down Expand Up @@ -231,10 +231,23 @@ mod snappy_codec {
None => decompress_len(input_buf)?,
};
let offset = output_buf.len();
output_buf.resize(offset + len, 0);
self.decoder
.decompress(input_buf, &mut output_buf[offset..])
.map_err(|e| e.into())
output_buf.reserve(len);
// SAFETY: we pass the spare capacity to snappy which will write exactly
// `len` bytes on success. The `set_len` below is only reached when
// decompression succeeds. `MaybeUninit<u8>` has the same layout as `u8`.
let spare = output_buf.spare_capacity_mut();
let spare_bytes = unsafe {
std::slice::from_raw_parts_mut(spare.as_mut_ptr().cast::<u8>(), spare.len())
};
let n = self
.decoder
.decompress(input_buf, &mut spare_bytes[..len])
.map_err(|e| -> ParquetError { e.into() })?;
// SAFETY: snappy wrote exactly `n` bytes into the spare capacity
unsafe {
output_buf.set_len(offset + n);
}
Ok(n)
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
Expand Down
67 changes: 66 additions & 1 deletion parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ impl RleEncoder {
/// Size, in number of `i32s` of buffer to use for RLE batch reading
const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;

/// A decoded batch from [`RleDecoder::get_batch_direct`].
#[cfg(feature = "arrow")]
pub enum RleDecodedBatch<'a> {
/// An RLE run: all values are the same index, repeated `count` times
Rle { index: i32, count: usize },
/// A batch of bit-packed indices
BitPacked(&'a [i32]),
}

/// A RLE/Bit-Packing hybrid decoder.
pub struct RleDecoder {
// Number of bits used to encode the value. Must be between [0, 64].
Expand Down Expand Up @@ -414,6 +423,53 @@ impl RleDecoder {
Ok(values_read)
}

/// Decode up to `max_values` indices and call `f` with each decoded batch.
///
/// For RLE runs, provides [`RleDecodedBatch::Rle`] so callers can fill output directly.
/// For bit-packed runs, provides [`RleDecodedBatch::BitPacked`] with decoded indices.
#[cfg(feature = "arrow")]
pub fn get_batch_direct<F>(
&mut self,
max_values: usize,
mut f: F,
) -> Result<usize>
where
F: FnMut(RleDecodedBatch<'_>),
{
let mut values_read = 0;
let mut index_buf = [0i32; 1024];
while values_read < max_values {
if self.rle_left > 0 {
let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
let idx = self.current_value.unwrap() as i32;
f(RleDecodedBatch::Rle { index: idx, count: num_values });
self.rle_left -= num_values as u32;
values_read += num_values;
} else if self.bit_packed_left > 0 {
let to_read = (max_values - values_read)
.min(self.bit_packed_left as usize)
.min(index_buf.len());
let bit_reader = self
.bit_reader
.as_mut()
.ok_or_else(|| general_err!("bit_reader should be set"))?;

let num_values =
bit_reader.get_batch::<i32>(&mut index_buf[..to_read], self.bit_width as usize);
if num_values == 0 {
self.bit_packed_left = 0;
continue;
}
f(RleDecodedBatch::BitPacked(&index_buf[..num_values]));
self.bit_packed_left -= num_values as u32;
values_read += num_values;
} else if !self.reload()? {
break;
}
}
Ok(values_read)
}

#[inline(never)]
pub fn skip(&mut self, num_values: usize) -> Result<usize> {
let mut values_skipped = 0;
Expand Down Expand Up @@ -458,6 +514,13 @@ impl RleDecoder {
{
assert!(buffer.len() >= max_values);

if dict.is_empty() {
return Ok(0);
}
// Clamp index to valid range to prevent UB on corrupt data.
// This is branchless (cmp+csel on ARM) and avoids bounds checks in the hot loop.
let max_idx = dict.len() - 1;

let mut values_read = 0;
while values_read < max_values {
let index_buf = self.index_buf.get_or_insert_with(|| Box::new([0; 1024]));
Expand Down Expand Up @@ -497,7 +560,9 @@ impl RleDecoder {
buffer[values_read..values_read + num_values]
.iter_mut()
.zip(index_buf[..num_values].iter())
.for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
.for_each(|(b, i)| unsafe {
b.clone_from(dict.get_unchecked((*i as usize).min(max_idx)));
});
self.bit_packed_left -= num_values as u32;
values_read += num_values;
if num_values < to_read {
Expand Down
Loading
Loading