From 00c528d1b26711d868fad3cacfad93361daf450f Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 2 Mar 2026 17:30:30 +0530 Subject: [PATCH] update --- parquet/file/column_reader.go | 3 +- parquet/file/column_writer_test.go | 9 + parquet/file/file_reader_test.go | 58 + parquet/file/file_writer_test.go | 149 ++ parquet/internal/encoding/alp_encoding.go | 966 ++++++++++++ .../internal/encoding/alp_encoding_test.go | 1306 +++++++++++++++++ parquet/internal/encoding/encoding_test.go | 11 + parquet/internal/encoding/typed_encoder.go | 4 + parquet/internal/gen-go/parquet/parquet.go | 3 + parquet/types.go | 2 + 10 files changed, 2510 insertions(+), 1 deletion(-) create mode 100644 parquet/internal/encoding/alp_encoding.go create mode 100644 parquet/internal/encoding/alp_encoding_test.go diff --git a/parquet/file/column_reader.go b/parquet/file/column_reader.go index 01d1e5beb..df7e3a9b6 100644 --- a/parquet/file/column_reader.go +++ b/parquet/file/column_reader.go @@ -488,7 +488,8 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error { format.Encoding_DELTA_BYTE_ARRAY, format.Encoding_DELTA_LENGTH_BYTE_ARRAY, format.Encoding_DELTA_BINARY_PACKED, - format.Encoding_BYTE_STREAM_SPLIT: + format.Encoding_BYTE_STREAM_SPLIT, + format.Encoding_ALP: c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem) c.decoders[encoding] = c.curDecoder case format.Encoding_RLE_DICTIONARY: diff --git a/parquet/file/column_writer_test.go b/parquet/file/column_writer_test.go index 8f5d35d68..83a178524 100644 --- a/parquet/file/column_writer_test.go +++ b/parquet/file/column_writer_test.go @@ -502,6 +502,15 @@ func (p *PrimitiveWriterTestSuite) TestRequiredByteStreamSplit() { } } +func (p *PrimitiveWriterTestSuite) TestRequiredAlp() { + switch p.Typ { + case reflect.TypeOf(float32(0)), reflect.TypeOf(float64(0)): + p.testRequiredWithEncoding(parquet.Encodings.ALP) + default: + p.Panics(func() { p.testRequiredWithEncoding(parquet.Encodings.ALP) }) + } +} + func (p *PrimitiveWriterTestSuite) TestRequiredDictionary() { p.testRequiredWithEncoding(parquet.Encodings.PlainDict) } diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go index c764ff070..1a5273671 100644 --- a/parquet/file/file_reader_test.go +++ b/parquet/file/file_reader_test.go @@ -24,6 +24,7 @@ import ( "encoding/csv" "fmt" "io" + "math" "os" "path" "testing" @@ -941,3 +942,60 @@ func TestListColumns(t *testing.T) { } } } + +func TestAlpEncodingFileRead(t *testing.T) { + testFile := os.Getenv("ALP_TEST_FILE") + if testFile == "" { + t.Skip("ALP_TEST_FILE not set, skipping") + } + if _, err := os.Stat(testFile); os.IsNotExist(err) { + t.Skipf("ALP_TEST_FILE not found: %s", testFile) + } + + props := parquet.NewReaderProperties(memory.DefaultAllocator) + fileReader, err := file.OpenParquetFile(testFile, false, file.WithReadProps(props)) + require.NoError(t, err) + defer fileReader.Close() + + nRows := 1024 + require.Equal(t, 1, fileReader.NumRowGroups()) + meta := fileReader.MetaData() + require.EqualValues(t, nRows, meta.GetNumRows()) + require.Equal(t, 1, meta.Schema.NumColumns()) + require.Equal(t, "value_f64", meta.Schema.Column(0).Name()) + require.Equal(t, parquet.Types.Double, meta.Schema.Column(0).PhysicalType()) + + rgr := fileReader.RowGroup(0) + require.EqualValues(t, nRows, rgr.NumRows()) + + rdr, err := rgr.Column(0) + require.NoError(t, err) + + f64Reader, ok := rdr.(*file.Float64ColumnChunkReader) + require.True(t, ok) + + values := make([]float64, nRows) + total, read, err := f64Reader.ReadBatch(int64(nRows), values, nil, nil) + require.NoError(t, err) + require.EqualValues(t, nRows, total) + require.EqualValues(t, nRows, read) + + md, err := rgr.MetaData().ColumnChunk(0) + require.NoError(t, err) + encodings := md.Encodings() + found := false + for _, enc := range encodings { + if enc == parquet.Encodings.ALP { + found = true + break + } + } + require.True(t, found, "expected ALP encoding in column chunk metadata, got %v", encodings) + + for i := 0; i < nRows; i++ { + expected := 0.125 + float64(i)*0.25 + assert.Equal(t, math.Float64bits(expected), math.Float64bits(values[i]), + "value[%d]: got %v (bits %016x), want %v (bits %016x)", + i, values[i], math.Float64bits(values[i]), expected, math.Float64bits(expected)) + } +} diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go index 7a6c6ae12..63178302e 100644 --- a/parquet/file/file_writer_test.go +++ b/parquet/file/file_writer_test.go @@ -592,6 +592,155 @@ func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) { require.NoError(t, rdr.Close()) } +func TestAlpFileRoundtrip(t *testing.T) { + for _, tc := range []struct { + name string + values []float64 + }{ + { + name: "monetary", + values: []float64{1.23, 4.56, 7.89, 10.11, 12.13, 14.15, 16.17, 18.19}, + }, + { + name: "large_vector", + values: func() []float64 { + v := make([]float64, 2048) + for i := range v { + v[i] = 0.125 + float64(i)*0.25 + } + return v + }(), + }, + { + name: "integers", + values: []float64{1, 2, 3, 100, 200, 300, 1000, 2000}, + }, + { + name: "special_values", + values: []float64{0, -0.0, 1.5, -1.5, math.Inf(1), math.Inf(-1), math.NaN()}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + props := parquet.NewWriterProperties( + parquet.WithEncoding(parquet.Encodings.ALP), + parquet.WithDictionaryDefault(false), + ) + + field, err := schema.NewPrimitiveNode("value", parquet.Repetitions.Required, parquet.Types.Double, -1, -1) + require.NoError(t, err) + + sc, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0) + require.NoError(t, err) + + sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) + writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props)) + + rgw := writer.AppendRowGroup() + cw, err := rgw.NextColumn() + require.NoError(t, err) + + f64Writer, ok := cw.(*file.Float64ColumnChunkWriter) + require.True(t, ok) + + nVals, err := f64Writer.WriteBatch(tc.values, nil, nil) + require.NoError(t, err) + require.EqualValues(t, len(tc.values), nVals) + + require.NoError(t, cw.Close()) + require.NoError(t, rgw.Close()) + require.NoError(t, writer.Close()) + + reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) + require.NoError(t, err) + + require.Equal(t, 1, reader.NumRowGroups()) + require.EqualValues(t, len(tc.values), reader.NumRows()) + + rgr := reader.RowGroup(0) + cr, err := rgr.Column(0) + require.NoError(t, err) + + f64Reader, ok := cr.(*file.Float64ColumnChunkReader) + require.True(t, ok) + + output := make([]float64, len(tc.values)) + total, read, err := f64Reader.ReadBatch(int64(len(tc.values)), output, nil, nil) + require.NoError(t, err) + require.EqualValues(t, len(tc.values), total) + require.EqualValues(t, len(tc.values), read) + + // Bit-exact comparison (handles NaN, -0.0 correctly) + for i := range tc.values { + assert.Equal(t, math.Float64bits(tc.values[i]), math.Float64bits(output[i]), + "index %d: got %v (bits %016x), want %v (bits %016x)", + i, output[i], math.Float64bits(output[i]), tc.values[i], math.Float64bits(tc.values[i])) + } + + require.NoError(t, reader.Close()) + }) + } +} + +func TestAlpFloat32FileRoundtrip(t *testing.T) { + values := []float32{1.23, 4.56, 7.89, 0, -1.5, 100.0, 0.001} + + props := parquet.NewWriterProperties( + parquet.WithEncoding(parquet.Encodings.ALP), + parquet.WithDictionaryDefault(false), + ) + + field, err := schema.NewPrimitiveNode("value", parquet.Repetitions.Required, parquet.Types.Float, -1, -1) + require.NoError(t, err) + + sc, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0) + require.NoError(t, err) + + sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) + writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props)) + + rgw := writer.AppendRowGroup() + cw, err := rgw.NextColumn() + require.NoError(t, err) + + f32Writer, ok := cw.(*file.Float32ColumnChunkWriter) + require.True(t, ok) + + nVals, err := f32Writer.WriteBatch(values, nil, nil) + require.NoError(t, err) + require.EqualValues(t, len(values), nVals) + + require.NoError(t, cw.Close()) + require.NoError(t, rgw.Close()) + require.NoError(t, writer.Close()) + + reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) + require.NoError(t, err) + + require.Equal(t, 1, reader.NumRowGroups()) + require.EqualValues(t, len(values), reader.NumRows()) + + rgr := reader.RowGroup(0) + cr, err := rgr.Column(0) + require.NoError(t, err) + + f32Reader, ok := cr.(*file.Float32ColumnChunkReader) + require.True(t, ok) + + output := make([]float32, len(values)) + total, read, err := f32Reader.ReadBatch(int64(len(values)), output, nil, nil) + require.NoError(t, err) + require.EqualValues(t, len(values), total) + require.EqualValues(t, len(values), read) + + for i := range values { + assert.Equal(t, math.Float32bits(values[i]), math.Float32bits(output[i]), + "index %d: got %v (bits %08x), want %v (bits %08x)", + i, output[i], math.Float32bits(output[i]), values[i], math.Float32bits(values[i])) + } + + require.NoError(t, reader.Close()) +} + func TestLZ4RawFileRoundtrip(t *testing.T) { input := []int64{ -1, 0, 1, 2, 3, 4, 5, 123456789, -123456789, diff --git a/parquet/internal/encoding/alp_encoding.go b/parquet/internal/encoding/alp_encoding.go new file mode 100644 index 000000000..264c91fd4 --- /dev/null +++ b/parquet/internal/encoding/alp_encoding.go @@ -0,0 +1,966 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +// ALP (Adaptive Lossless floating-Point) encoding for FLOAT and DOUBLE types. +// Reference: https://dl.acm.org/doi/10.1145/3626717 + +import ( + "encoding/binary" + "fmt" + "math" + "math/bits" + "sort" + + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet" + "github.com/apache/arrow-go/v18/parquet/schema" + "golang.org/x/xerrors" +) + +const ( + alpVersion = 1 + alpCompressionMode = 0 // ALP + alpIntegerEncodingFOR = 0 // kForBitPack + alpHeaderSize = 8 + alpDefaultVectorSize = 1024 + alpDefaultLogVector = 10 + alpMaxLogVectorSize = 15 + alpMinLogVectorSize = 3 + + alpInfoSize = 4 // exponent(1) + factor(1) + num_exceptions(2) + alpFloatForInfo = 5 // frame_of_reference(4) + bit_width(1) + alpDoubleForInfo = 9 // frame_of_reference(8) + bit_width(1) + + alpFloatMaxExponent = 10 + alpDoubleMaxExponent = 18 + + alpSamplerVectors = 8 + alpMaxPresetCombinations = 5 + + alpMagicFloat32 = float32(12582912.0) // 2^22 + 2^23 + alpMagicFloat64 = float64(6755399441055744.0) // 2^51 + 2^52 + + alpNegZeroFloat32Bits = uint32(0x80000000) + alpNegZeroFloat64Bits = uint64(0x8000000000000000) +) + +var ( + alpFloatPow10 = [11]float32{ + 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, + } + alpDoublePow10 = [19]float64{ + 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, + 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18, + } +) +func alpFloatMultiplier(exponent, factor int) float32 { + m := alpFloatPow10[exponent] + if factor > 0 { + m /= alpFloatPow10[factor] + } + return m +} + +func alpDoubleMultiplier(exponent, factor int) float64 { + m := alpDoublePow10[exponent] + if factor > 0 { + m /= alpDoublePow10[factor] + } + return m +} + +func alpFastRoundFloat32(v float32) int32 { + if v >= 0 { + return int32((v + alpMagicFloat32) - alpMagicFloat32) + } + return int32((v - alpMagicFloat32) + alpMagicFloat32) +} + +func alpFastRoundFloat64(v float64) int64 { + if v >= 0 { + return int64((v + alpMagicFloat64) - alpMagicFloat64) + } + return int64((v - alpMagicFloat64) + alpMagicFloat64) +} + +func alpIsBasicFloatException(v float32) bool { + return math.IsNaN(float64(v)) || math.IsInf(float64(v), 0) || + math.Float32bits(v) == alpNegZeroFloat32Bits +} + +func alpIsBasicDoubleException(v float64) bool { + return math.IsNaN(v) || math.IsInf(v, 0) || + math.Float64bits(v) == alpNegZeroFloat64Bits +} + +func alpIsFloatException(v float32, exponent, factor int) bool { + if alpIsBasicFloatException(v) { + return true + } + m := alpFloatMultiplier(exponent, factor) + scaled := float64(v) * float64(m) + if scaled > math.MaxInt32 || scaled < math.MinInt32 { + return true + } + encoded := alpFastRoundFloat32(v * m) + decoded := float32(encoded) / m + return math.Float32bits(v) != math.Float32bits(decoded) +} + +func alpIsDoubleException(v float64, exponent, factor int) bool { + if alpIsBasicDoubleException(v) { + return true + } + m := alpDoubleMultiplier(exponent, factor) + scaled := v * m + if scaled > math.MaxInt64 || scaled < math.MinInt64 { + return true + } + encoded := alpFastRoundFloat64(v * m) + decoded := float64(encoded) / m + return math.Float64bits(v) != math.Float64bits(decoded) +} + +func alpEncodeFloat32(v float32, exponent, factor int) int32 { + return alpFastRoundFloat32(v * alpFloatMultiplier(exponent, factor)) +} + +func alpDecodeFloat32(encoded int32, exponent, factor int) float32 { + return float32(encoded) / alpFloatMultiplier(exponent, factor) +} + +func alpEncodeFloat64(v float64, exponent, factor int) int64 { + return alpFastRoundFloat64(v * alpDoubleMultiplier(exponent, factor)) +} + +func alpDecodeFloat64(encoded int64, exponent, factor int) float64 { + return float64(encoded) / alpDoubleMultiplier(exponent, factor) +} + +func alpBitWidth32(v uint32) int { + if v == 0 { + return 0 + } + return bits.Len32(v) +} + +func alpBitWidth64(v uint64) int { + if v == 0 { + return 0 + } + return bits.Len64(v) +} +func alpPackBits32(values []uint32, count, bitWidth int) []byte { + if bitWidth == 0 || count == 0 { + return nil + } + packedSize := (count*bitWidth + 7) / 8 + out := make([]byte, packedSize) + + var buf uint64 + bitsInBuf := 0 + byteIdx := 0 + + for i := 0; i < count; i++ { + buf |= uint64(values[i]) << uint(bitsInBuf) + bitsInBuf += bitWidth + + for bitsInBuf >= 8 { + out[byteIdx] = byte(buf) + buf >>= 8 + bitsInBuf -= 8 + byteIdx++ + } + } + + if bitsInBuf > 0 { + out[byteIdx] = byte(buf) + } + + return out +} + +func alpUnpackBits32(packed []byte, count, bitWidth int) []uint32 { + out := make([]uint32, count) + if bitWidth == 0 || count == 0 { + return out + } + + var buf uint64 + bitsInBuf := 0 + byteIdx := 0 + mask := uint64((1 << uint(bitWidth)) - 1) + + for i := 0; i < count; i++ { + for bitsInBuf < bitWidth && byteIdx < len(packed) { + buf |= uint64(packed[byteIdx]) << uint(bitsInBuf) + bitsInBuf += 8 + byteIdx++ + } + + out[i] = uint32(buf & mask) + buf >>= uint(bitWidth) + bitsInBuf -= bitWidth + } + + return out +} + +func alpPackBits64(values []uint64, count, bitWidth int) []byte { + if bitWidth == 0 || count == 0 { + return nil + } + packedSize := (count*bitWidth + 7) / 8 + out := make([]byte, packedSize) + + byteIdx := 0 + bitPos := 0 + + for i := 0; i < count; i++ { + val := values[i] + remaining := bitWidth + + for remaining > 0 { + bitIdx := bitPos & 7 + bitsToWrite := remaining + if bitsToWrite > 8-bitIdx { + bitsToWrite = 8 - bitIdx + } + m := uint64((1 << uint(bitsToWrite)) - 1) + out[byteIdx] |= byte((val & m) << uint(bitIdx)) + val >>= uint(bitsToWrite) + bitPos += bitsToWrite + remaining -= bitsToWrite + byteIdx = bitPos >> 3 + } + } + + return out +} + +func alpUnpackBits64(packed []byte, count, bitWidth int) []uint64 { + out := make([]uint64, count) + if bitWidth == 0 || count == 0 { + return out + } + + byteIdx := 0 + bitPos := 0 + + for i := 0; i < count; i++ { + var val uint64 + remaining := bitWidth + shift := 0 + + for remaining > 0 { + bitIdx := bitPos & 7 + bitsToRead := remaining + if bitsToRead > 8-bitIdx { + bitsToRead = 8 - bitIdx + } + m := byte((1 << uint(bitsToRead)) - 1) + val |= uint64((packed[byteIdx]>>uint(bitIdx))&m) << uint(shift) + shift += bitsToRead + bitPos += bitsToRead + remaining -= bitsToRead + byteIdx = bitPos >> 3 + } + + out[i] = val + } + + return out +} +type alpEncodingParams struct { + exponent int + factor int + numExceptions int +} + +func alpFindBestFloat32Params(values []float32, offset, length int) alpEncodingParams { + best := alpEncodingParams{exponent: 0, factor: 0, numExceptions: length} + + for e := 0; e <= alpFloatMaxExponent; e++ { + for f := 0; f <= e; f++ { + exceptions := 0 + for i := 0; i < length; i++ { + if alpIsFloatException(values[offset+i], e, f) { + exceptions++ + } + } + if exceptions < best.numExceptions { + best = alpEncodingParams{exponent: e, factor: f, numExceptions: exceptions} + if best.numExceptions == 0 { + return best + } + } + } + } + return best +} + +func alpFindBestFloat32ParamsWithPresets(values []float32, offset, length int, presets [][2]int) alpEncodingParams { + best := alpEncodingParams{exponent: presets[0][0], factor: presets[0][1], numExceptions: length} + + for _, p := range presets { + e, f := p[0], p[1] + exceptions := 0 + for i := 0; i < length; i++ { + if alpIsFloatException(values[offset+i], e, f) { + exceptions++ + } + } + if exceptions < best.numExceptions { + best = alpEncodingParams{exponent: e, factor: f, numExceptions: exceptions} + if best.numExceptions == 0 { + return best + } + } + } + return best +} + +func alpFindBestFloat64Params(values []float64, offset, length int) alpEncodingParams { + best := alpEncodingParams{exponent: 0, factor: 0, numExceptions: length} + + for e := 0; e <= alpDoubleMaxExponent; e++ { + for f := 0; f <= e; f++ { + exceptions := 0 + for i := 0; i < length; i++ { + if alpIsDoubleException(values[offset+i], e, f) { + exceptions++ + } + } + if exceptions < best.numExceptions { + best = alpEncodingParams{exponent: e, factor: f, numExceptions: exceptions} + if best.numExceptions == 0 { + return best + } + } + } + } + return best +} + +func alpFindBestFloat64ParamsWithPresets(values []float64, offset, length int, presets [][2]int) alpEncodingParams { + best := alpEncodingParams{exponent: presets[0][0], factor: presets[0][1], numExceptions: length} + + for _, p := range presets { + e, f := p[0], p[1] + exceptions := 0 + for i := 0; i < length; i++ { + if alpIsDoubleException(values[offset+i], e, f) { + exceptions++ + } + } + if exceptions < best.numExceptions { + best = alpEncodingParams{exponent: e, factor: f, numExceptions: exceptions} + if best.numExceptions == 0 { + return best + } + } + } + return best +} +type alpEncoder[T float32 | float64] struct { + encoder + + vectorSize int + logVectorSize int + + vectorBuf []T + bufCount int + totalCount int + + encodedBuf []byte + vectorSizes []int + + vectorsProcessed int + cachedPresets [][2]int + presetCounts map[uint32]int +} + +func newAlpEncoder[T float32 | float64](e format.Encoding, descr *schema.Column, mem memory.Allocator) *alpEncoder[T] { + return &alpEncoder[T]{ + encoder: newEncoderBase(e, descr, mem), + vectorSize: alpDefaultVectorSize, + logVectorSize: alpDefaultLogVector, + vectorBuf: make([]T, alpDefaultVectorSize), + presetCounts: make(map[uint32]int), + } +} + +func (enc *alpEncoder[T]) Type() parquet.Type { + var z T + switch any(z).(type) { + case float32: + return parquet.Types.Float + case float64: + return parquet.Types.Double + } + panic("unreachable") +} + +func (enc *alpEncoder[T]) Put(in []T) { + for i := 0; i < len(in); { + space := enc.vectorSize - enc.bufCount + toCopy := len(in) - i + if toCopy > space { + toCopy = space + } + copy(enc.vectorBuf[enc.bufCount:], in[i:i+toCopy]) + enc.bufCount += toCopy + enc.totalCount += toCopy + i += toCopy + + if enc.bufCount == enc.vectorSize { + enc.encodeVector(enc.bufCount) + enc.bufCount = 0 + } + } +} + +func (enc *alpEncoder[T]) PutSpaced(in []T, validBits []byte, validBitsOffset int64) { + nbuf := make([]T, len(in)) + nvalid := spacedCompress(in, nbuf, validBits, validBitsOffset) + enc.Put(nbuf[:nvalid]) +} + +func (enc *alpEncoder[T]) encodeVector(vectorLen int) { + var z T + switch any(z).(type) { + case float32: + enc.encodeFloat32Vector(vectorLen) + case float64: + enc.encodeFloat64Vector(vectorLen) + } +} + +func (enc *alpEncoder[T]) encodeFloat32Vector(vectorLen int) { + values := make([]float32, vectorLen) + for i := 0; i < vectorLen; i++ { + values[i] = float32(any(enc.vectorBuf[i]).(float32)) + } + + var params alpEncodingParams + if enc.cachedPresets != nil { + params = alpFindBestFloat32ParamsWithPresets(values, 0, vectorLen, enc.cachedPresets) + } else { + params = alpFindBestFloat32Params(values, 0, vectorLen) + key := uint32(params.exponent<<16) | uint32(params.factor) + enc.presetCounts[key]++ + } + + enc.vectorsProcessed++ + if enc.cachedPresets == nil && enc.vectorsProcessed >= alpSamplerVectors { + enc.buildPresetCache() + } + + encoded := make([]int32, vectorLen) + excPositions := make([]uint16, 0, params.numExceptions) + excValues := make([]float32, 0, params.numExceptions) + + var placeholder int32 + for i := 0; i < vectorLen; i++ { + if !alpIsFloatException(values[i], params.exponent, params.factor) { + placeholder = alpEncodeFloat32(values[i], params.exponent, params.factor) + break + } + } + + minValue := int32(math.MaxInt32) + for i := 0; i < vectorLen; i++ { + if alpIsFloatException(values[i], params.exponent, params.factor) { + excPositions = append(excPositions, uint16(i)) + excValues = append(excValues, values[i]) + encoded[i] = placeholder + } else { + encoded[i] = alpEncodeFloat32(values[i], params.exponent, params.factor) + } + if encoded[i] < minValue { + minValue = encoded[i] + } + } + + var maxDelta uint32 + deltas := make([]uint32, vectorLen) + for i := 0; i < vectorLen; i++ { + deltas[i] = uint32(encoded[i] - minValue) // wrapping subtraction, result is unsigned + if deltas[i] > maxDelta { + maxDelta = deltas[i] + } + } + + bitWidth := alpBitWidth32(maxDelta) + + startLen := len(enc.encodedBuf) + + var alpInfo [alpInfoSize]byte + alpInfo[0] = byte(params.exponent) + alpInfo[1] = byte(params.factor) + binary.LittleEndian.PutUint16(alpInfo[2:], uint16(params.numExceptions)) + enc.encodedBuf = append(enc.encodedBuf, alpInfo[:]...) + + var forInfo [alpFloatForInfo]byte + binary.LittleEndian.PutUint32(forInfo[0:], uint32(minValue)) + forInfo[4] = byte(bitWidth) + enc.encodedBuf = append(enc.encodedBuf, forInfo[:]...) + + if bitWidth > 0 { + packed := alpPackBits32(deltas, vectorLen, bitWidth) + enc.encodedBuf = append(enc.encodedBuf, packed...) + } + + if params.numExceptions > 0 { + for _, pos := range excPositions { + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], pos) + enc.encodedBuf = append(enc.encodedBuf, buf[:]...) + } + for _, val := range excValues { + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], math.Float32bits(val)) + enc.encodedBuf = append(enc.encodedBuf, buf[:]...) + } + } + + enc.vectorSizes = append(enc.vectorSizes, len(enc.encodedBuf)-startLen) +} + +func (enc *alpEncoder[T]) encodeFloat64Vector(vectorLen int) { + values := make([]float64, vectorLen) + for i := 0; i < vectorLen; i++ { + values[i] = float64(any(enc.vectorBuf[i]).(float64)) + } + + var params alpEncodingParams + if enc.cachedPresets != nil { + params = alpFindBestFloat64ParamsWithPresets(values, 0, vectorLen, enc.cachedPresets) + } else { + params = alpFindBestFloat64Params(values, 0, vectorLen) + key := uint32(params.exponent<<16) | uint32(params.factor) + enc.presetCounts[key]++ + } + + enc.vectorsProcessed++ + if enc.cachedPresets == nil && enc.vectorsProcessed >= alpSamplerVectors { + enc.buildPresetCache() + } + + encoded := make([]int64, vectorLen) + excPositions := make([]uint16, 0, params.numExceptions) + excValues := make([]float64, 0, params.numExceptions) + + var placeholder int64 + for i := 0; i < vectorLen; i++ { + if !alpIsDoubleException(values[i], params.exponent, params.factor) { + placeholder = alpEncodeFloat64(values[i], params.exponent, params.factor) + break + } + } + + minValue := int64(math.MaxInt64) + for i := 0; i < vectorLen; i++ { + if alpIsDoubleException(values[i], params.exponent, params.factor) { + excPositions = append(excPositions, uint16(i)) + excValues = append(excValues, values[i]) + encoded[i] = placeholder + } else { + encoded[i] = alpEncodeFloat64(values[i], params.exponent, params.factor) + } + if encoded[i] < minValue { + minValue = encoded[i] + } + } + + var maxDelta uint64 + deltas := make([]uint64, vectorLen) + for i := 0; i < vectorLen; i++ { + deltas[i] = uint64(encoded[i] - minValue) + if deltas[i] > maxDelta { + maxDelta = deltas[i] + } + } + + bitWidth := alpBitWidth64(maxDelta) + + startLen := len(enc.encodedBuf) + + var alpInfo [alpInfoSize]byte + alpInfo[0] = byte(params.exponent) + alpInfo[1] = byte(params.factor) + binary.LittleEndian.PutUint16(alpInfo[2:], uint16(params.numExceptions)) + enc.encodedBuf = append(enc.encodedBuf, alpInfo[:]...) + + var forInfo [alpDoubleForInfo]byte + binary.LittleEndian.PutUint64(forInfo[0:], uint64(minValue)) + forInfo[8] = byte(bitWidth) + enc.encodedBuf = append(enc.encodedBuf, forInfo[:]...) + + if bitWidth > 0 { + packed := alpPackBits64(deltas, vectorLen, bitWidth) + enc.encodedBuf = append(enc.encodedBuf, packed...) + } + + if params.numExceptions > 0 { + for _, pos := range excPositions { + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], pos) + enc.encodedBuf = append(enc.encodedBuf, buf[:]...) + } + for _, val := range excValues { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(val)) + enc.encodedBuf = append(enc.encodedBuf, buf[:]...) + } + } + + enc.vectorSizes = append(enc.vectorSizes, len(enc.encodedBuf)-startLen) +} + +func (enc *alpEncoder[T]) buildPresetCache() { + type kv struct { + key uint32 + count int + } + sorted := make([]kv, 0, len(enc.presetCounts)) + for k, v := range enc.presetCounts { + sorted = append(sorted, kv{k, v}) + } + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].count > sorted[j].count + }) + + numPresets := len(sorted) + if numPresets > alpMaxPresetCombinations { + numPresets = alpMaxPresetCombinations + } + enc.cachedPresets = make([][2]int, numPresets) + for i := 0; i < numPresets; i++ { + enc.cachedPresets[i][0] = int(sorted[i].key >> 16) + enc.cachedPresets[i][1] = int(sorted[i].key & 0xFFFF) + } +} + +func (enc *alpEncoder[T]) EstimatedDataEncodedSize() int64 { + return int64(len(enc.encodedBuf)) + int64(enc.bufCount)*3 +} + +func (enc *alpEncoder[T]) FlushValues() (Buffer, error) { + if enc.bufCount > 0 { + enc.encodeVector(enc.bufCount) + enc.bufCount = 0 + } + + if enc.totalCount == 0 { + return enc.encoder.FlushValues() + } + + numVectors := len(enc.vectorSizes) + offsetArraySize := numVectors * 4 + totalSize := alpHeaderSize + offsetArraySize + len(enc.encodedBuf) + enc.sink.Reserve(totalSize) + + var header [alpHeaderSize]byte + header[0] = alpVersion + header[1] = alpCompressionMode + header[2] = alpIntegerEncodingFOR + header[3] = byte(enc.logVectorSize) + binary.LittleEndian.PutUint32(header[4:], uint32(enc.totalCount)) + enc.sink.Write(header[:]) + + currentOffset := uint32(offsetArraySize) + for _, size := range enc.vectorSizes { + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], currentOffset) + enc.sink.Write(buf[:]) + currentOffset += uint32(size) + } + + enc.sink.Write(enc.encodedBuf) + + return enc.sink.Finish(), nil +} + +func (enc *alpEncoder[T]) Reset() { + enc.encoder.Reset() + enc.bufCount = 0 + enc.totalCount = 0 + enc.encodedBuf = enc.encodedBuf[:0] + enc.vectorSizes = enc.vectorSizes[:0] + enc.vectorsProcessed = 0 + enc.cachedPresets = nil + enc.presetCounts = make(map[uint32]int) +} + +func (enc *alpEncoder[T]) Release() { + enc.encoder.Release() +} +type alpDecoder[T float32 | float64] struct { + decoder + + vectorSize int + totalCount int + numVectors int + + vectorOffsets []uint32 + bodyData []byte + offsetArraySize int + + currentIndex int + currentVectorIndex int + decodedValues []T +} + +func (dec *alpDecoder[T]) Type() parquet.Type { + var z T + switch any(z).(type) { + case float32: + return parquet.Types.Float + case float64: + return parquet.Types.Double + } + panic("unreachable") +} + +func (dec *alpDecoder[T]) SetData(nvals int, data []byte) error { + if len(data) < alpHeaderSize { + return fmt.Errorf("parquet: ALP data too short for header: %d bytes", len(data)) + } + + version := data[0] + compressionMode := data[1] + integerEncoding := data[2] + logVectorSize := data[3] + numElements := int32(binary.LittleEndian.Uint32(data[4:8])) + + if version != alpVersion { + return fmt.Errorf("parquet: unsupported ALP version: %d, expected %d", version, alpVersion) + } + if compressionMode != alpCompressionMode { + return fmt.Errorf("parquet: unsupported ALP compression mode: %d", compressionMode) + } + if integerEncoding != alpIntegerEncodingFOR { + return fmt.Errorf("parquet: unsupported ALP integer encoding: %d", integerEncoding) + } + if logVectorSize < alpMinLogVectorSize || logVectorSize > alpMaxLogVectorSize { + return fmt.Errorf("parquet: invalid ALP log vector size: %d, must be between %d and %d", + logVectorSize, alpMinLogVectorSize, alpMaxLogVectorSize) + } + if numElements < 0 { + return fmt.Errorf("parquet: invalid ALP element count: %d", numElements) + } + + dec.vectorSize = 1 << uint(logVectorSize) + dec.totalCount = int(numElements) + dec.numVectors = (dec.totalCount + dec.vectorSize - 1) / dec.vectorSize + dec.currentIndex = 0 + dec.currentVectorIndex = -1 + dec.nvals = nvals + + dec.offsetArraySize = dec.numVectors * 4 + offsetEnd := alpHeaderSize + dec.offsetArraySize + if len(data) < offsetEnd { + return fmt.Errorf("parquet: ALP data too short for offset array: need %d, have %d", + offsetEnd, len(data)) + } + + dec.vectorOffsets = make([]uint32, dec.numVectors) + for i := 0; i < dec.numVectors; i++ { + dec.vectorOffsets[i] = binary.LittleEndian.Uint32(data[alpHeaderSize+i*4:]) + } + + dec.bodyData = data[alpHeaderSize:] + dec.decodedValues = make([]T, dec.vectorSize) + + return nil +} + +func (dec *alpDecoder[T]) getVectorLength(vectorIdx int) int { + if vectorIdx < dec.numVectors-1 { + return dec.vectorSize + } + lastLen := dec.totalCount % dec.vectorSize + if lastLen == 0 { + return dec.vectorSize + } + return lastLen +} + +func (dec *alpDecoder[T]) getVectorDataPos(vectorIdx int) int { + return int(dec.vectorOffsets[vectorIdx]) +} + +func (dec *alpDecoder[T]) decodeVector(vectorIdx int) { + var z T + switch any(z).(type) { + case float32: + dec.decodeFloat32Vector(vectorIdx) + case float64: + dec.decodeFloat64Vector(vectorIdx) + } +} + +func (dec *alpDecoder[T]) decodeFloat32Vector(vectorIdx int) { + vectorLen := dec.getVectorLength(vectorIdx) + pos := dec.getVectorDataPos(vectorIdx) + + exponent := int(dec.bodyData[pos]) + factor := int(dec.bodyData[pos+1]) + numExceptions := int(binary.LittleEndian.Uint16(dec.bodyData[pos+2:])) + pos += alpInfoSize + + frameOfRef := int32(binary.LittleEndian.Uint32(dec.bodyData[pos:])) + bitWidth := int(dec.bodyData[pos+4]) + pos += alpFloatForInfo + + var deltas []uint32 + if bitWidth > 0 { + packedSize := (vectorLen*bitWidth + 7) / 8 + deltas = alpUnpackBits32(dec.bodyData[pos:pos+packedSize], vectorLen, bitWidth) + pos += packedSize + } else { + deltas = make([]uint32, vectorLen) + } + + outSlice := any(&dec.decodedValues).(*[]float32) + for i := 0; i < vectorLen; i++ { + encoded := int32(deltas[i]) + frameOfRef + (*outSlice)[i] = alpDecodeFloat32(encoded, exponent, factor) + } + + if numExceptions > 0 { + excPositions := make([]uint16, numExceptions) + for e := 0; e < numExceptions; e++ { + excPositions[e] = binary.LittleEndian.Uint16(dec.bodyData[pos:]) + pos += 2 + } + for e := 0; e < numExceptions; e++ { + bits := binary.LittleEndian.Uint32(dec.bodyData[pos:]) + (*outSlice)[excPositions[e]] = math.Float32frombits(bits) + pos += 4 + } + } +} + +func (dec *alpDecoder[T]) decodeFloat64Vector(vectorIdx int) { + vectorLen := dec.getVectorLength(vectorIdx) + pos := dec.getVectorDataPos(vectorIdx) + + exponent := int(dec.bodyData[pos]) + factor := int(dec.bodyData[pos+1]) + numExceptions := int(binary.LittleEndian.Uint16(dec.bodyData[pos+2:])) + pos += alpInfoSize + + frameOfRef := int64(binary.LittleEndian.Uint64(dec.bodyData[pos:])) + bitWidth := int(dec.bodyData[pos+8]) + pos += alpDoubleForInfo + + var deltas []uint64 + if bitWidth > 0 { + packedSize := (vectorLen*bitWidth + 7) / 8 + deltas = alpUnpackBits64(dec.bodyData[pos:pos+packedSize], vectorLen, bitWidth) + pos += packedSize + } else { + deltas = make([]uint64, vectorLen) + } + + outSlice := any(&dec.decodedValues).(*[]float64) + for i := 0; i < vectorLen; i++ { + encoded := int64(deltas[i]) + frameOfRef + (*outSlice)[i] = alpDecodeFloat64(encoded, exponent, factor) + } + + if numExceptions > 0 { + excPositions := make([]uint16, numExceptions) + for e := 0; e < numExceptions; e++ { + excPositions[e] = binary.LittleEndian.Uint16(dec.bodyData[pos:]) + pos += 2 + } + for e := 0; e < numExceptions; e++ { + bits := binary.LittleEndian.Uint64(dec.bodyData[pos:]) + (*outSlice)[excPositions[e]] = math.Float64frombits(bits) + pos += 8 + } + } +} + +func (dec *alpDecoder[T]) Decode(out []T) (int, error) { + toRead := len(out) + if toRead > dec.nvals { + toRead = dec.nvals + } + + read := 0 + for read < toRead { + // Determine which vector we're reading from + vectorIdx := dec.currentIndex / dec.vectorSize + if vectorIdx >= dec.numVectors { + break + } + + // Ensure current vector is decoded + if vectorIdx != dec.currentVectorIndex { + dec.decodeVector(vectorIdx) + dec.currentVectorIndex = vectorIdx + } + + // Copy from decoded buffer + indexInVector := dec.currentIndex % dec.vectorSize + vectorLen := dec.getVectorLength(vectorIdx) + available := vectorLen - indexInVector + toCopy := toRead - read + if toCopy > available { + toCopy = available + } + + copy(out[read:], dec.decodedValues[indexInVector:indexInVector+toCopy]) + read += toCopy + dec.currentIndex += toCopy + } + + dec.nvals -= read + return read, nil +} + +func (dec *alpDecoder[T]) DecodeSpaced(out []T, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + toRead := len(out) - nullCount + valuesRead, err := dec.Decode(out[:toRead]) + if err != nil { + return valuesRead, err + } + if valuesRead != toRead { + return valuesRead, xerrors.New("parquet: number of values / definitions levels read did not match") + } + + return spacedExpand(out, nullCount, validBits, validBitsOffset), nil +} + +func (dec *alpDecoder[T]) Discard(n int) (int, error) { + if n > dec.nvals { + n = dec.nvals + } + dec.nvals -= n + dec.currentIndex += n + return n, nil +} +type AlpFloat32Encoder = alpEncoder[float32] +type AlpFloat64Encoder = alpEncoder[float64] +type AlpFloat32Decoder = alpDecoder[float32] +type AlpFloat64Decoder = alpDecoder[float64] diff --git a/parquet/internal/encoding/alp_encoding_test.go b/parquet/internal/encoding/alp_encoding_test.go new file mode 100644 index 000000000..5fe29f6f2 --- /dev/null +++ b/parquet/internal/encoding/alp_encoding_test.go @@ -0,0 +1,1306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "encoding/binary" + "fmt" + "math" + "testing" + + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet" + "github.com/apache/arrow-go/v18/parquet/schema" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAlpFastRoundFloat32(t *testing.T) { + tests := []struct { + name string + input float32 + expected int32 + }{ + {"positive integer", 5.0, 5}, + {"positive round up", 2.7, 3}, + {"positive round down", 2.3, 2}, + {"positive half", 2.5, 2}, // round-to-even via magic trick + {"negative integer", -5.0, -5}, + {"negative round up", -2.3, -2}, + {"negative round down", -2.7, -3}, + {"zero", 0.0, 0}, + {"small positive", 0.1, 0}, + {"large integer", 12345.0, 12345}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := alpFastRoundFloat32(tt.input) + assert.Equal(t, tt.expected, result, "alpFastRoundFloat32(%v) = %d, want %d", tt.input, result, tt.expected) + }) + } +} + +func TestAlpFastRoundFloat64(t *testing.T) { + tests := []struct { + name string + input float64 + expected int64 + }{ + {"positive integer", 5.0, 5}, + {"positive round up", 2.7, 3}, + {"positive round down", 2.3, 2}, + {"negative integer", -5.0, -5}, + {"negative round up", -2.3, -2}, + {"negative round down", -2.7, -3}, + {"zero", 0.0, 0}, + {"small positive", 0.1, 0}, + {"large integer", 123456789.0, 123456789}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := alpFastRoundFloat64(tt.input) + assert.Equal(t, tt.expected, result, "alpFastRoundFloat64(%v) = %d, want %d", tt.input, result, tt.expected) + }) + } +} + +func TestAlpIsBasicFloatException(t *testing.T) { + assert.True(t, alpIsBasicFloatException(float32(math.NaN())), "NaN should be exception") + assert.True(t, alpIsBasicFloatException(float32(math.Inf(1))), "+Inf should be exception") + assert.True(t, alpIsBasicFloatException(float32(math.Inf(-1))), "-Inf should be exception") + negZero := math.Float32frombits(alpNegZeroFloat32Bits) + assert.True(t, alpIsBasicFloatException(negZero), "-0.0 should be exception") + assert.False(t, alpIsBasicFloatException(0.0), "+0.0 should not be exception") + assert.False(t, alpIsBasicFloatException(1.0), "1.0 should not be exception") + assert.False(t, alpIsBasicFloatException(-1.0), "-1.0 should not be exception") +} + +func TestAlpIsBasicDoubleException(t *testing.T) { + assert.True(t, alpIsBasicDoubleException(math.NaN()), "NaN should be exception") + assert.True(t, alpIsBasicDoubleException(math.Inf(1)), "+Inf should be exception") + assert.True(t, alpIsBasicDoubleException(math.Inf(-1)), "-Inf should be exception") + negZero := math.Float64frombits(alpNegZeroFloat64Bits) + assert.True(t, alpIsBasicDoubleException(negZero), "-0.0 should be exception") + assert.False(t, alpIsBasicDoubleException(0.0), "+0.0 should not be exception") + assert.False(t, alpIsBasicDoubleException(1.0), "1.0 should not be exception") + assert.False(t, alpIsBasicDoubleException(-1.0), "-1.0 should not be exception") +} + +func TestAlpIsFloatException(t *testing.T) { + assert.True(t, alpIsFloatException(float32(math.NaN()), 2, 0)) + assert.True(t, alpIsFloatException(float32(math.Inf(1)), 2, 0)) + negZero := math.Float32frombits(alpNegZeroFloat32Bits) + assert.True(t, alpIsFloatException(negZero, 2, 0)) + + assert.False(t, alpIsFloatException(1.23, 2, 0), "1.23 with exp=2 should encode to 123") + assert.False(t, alpIsFloatException(0.0, 2, 0), "0.0 should not be exception") + assert.False(t, alpIsFloatException(42.0, 0, 0), "42.0 with exp=0 should encode to 42") + + assert.True(t, alpIsFloatException(float32(1e30), 10, 0), + "1e30 * 10^10 overflows int32") +} + +func TestAlpIsDoubleException(t *testing.T) { + assert.True(t, alpIsDoubleException(math.NaN(), 2, 0)) + assert.True(t, alpIsDoubleException(math.Inf(1), 2, 0)) + negZero := math.Float64frombits(alpNegZeroFloat64Bits) + assert.True(t, alpIsDoubleException(negZero, 2, 0)) + + assert.False(t, alpIsDoubleException(1.23, 2, 0), "1.23 with exp=2 should encode to 123") + assert.False(t, alpIsDoubleException(0.0, 2, 0), "0.0 should not be exception") + assert.False(t, alpIsDoubleException(42.0, 0, 0), "42.0 with exp=0 should encode to 42") +} + +func TestAlpEncodeDecodeFloat32(t *testing.T) { + tests := []struct { + name string + value float32 + exponent int + factor int + encoded int32 + }{ + {"integer no scaling", 42.0, 0, 0, 42}, + {"one decimal", 1.5, 1, 0, 15}, + {"two decimals", 1.23, 2, 0, 123}, + {"negative", -1.23, 2, 0, -123}, + {"zero", 0.0, 5, 0, 0}, + {"with factor", 1230.0, 5, 2, 1230000}, // 1230 * 10^5 / 10^2 = 1230 * 1000 + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + encoded := alpEncodeFloat32(tt.value, tt.exponent, tt.factor) + assert.Equal(t, tt.encoded, encoded, "encode(%v, exp=%d, fac=%d)", tt.value, tt.exponent, tt.factor) + + decoded := alpDecodeFloat32(encoded, tt.exponent, tt.factor) + assert.Equal(t, math.Float32bits(tt.value), math.Float32bits(decoded), + "decode(%d, exp=%d, fac=%d) = %v, want %v", encoded, tt.exponent, tt.factor, decoded, tt.value) + }) + } +} + +func TestAlpEncodeDecodeFloat64(t *testing.T) { + tests := []struct { + name string + value float64 + exponent int + factor int + encoded int64 + }{ + {"integer no scaling", 42.0, 0, 0, 42}, + {"one decimal", 1.5, 1, 0, 15}, + {"two decimals", 1.23, 2, 0, 123}, + {"negative", -1.23, 2, 0, -123}, + {"zero", 0.0, 5, 0, 0}, + {"with factor", 1230.0, 5, 2, 1230000}, // 1230 * 10^5 / 10^2 = 1230 * 1000 + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + encoded := alpEncodeFloat64(tt.value, tt.exponent, tt.factor) + assert.Equal(t, tt.encoded, encoded, "encode(%v, exp=%d, fac=%d)", tt.value, tt.exponent, tt.factor) + + decoded := alpDecodeFloat64(encoded, tt.exponent, tt.factor) + assert.Equal(t, math.Float64bits(tt.value), math.Float64bits(decoded), + "decode(%d, exp=%d, fac=%d) = %v, want %v", encoded, tt.exponent, tt.factor, decoded, tt.value) + }) + } +} + +func TestAlpMultiplier(t *testing.T) { + assert.Equal(t, float32(1.0), alpFloatMultiplier(0, 0)) + assert.Equal(t, float32(100.0), alpFloatMultiplier(2, 0)) + assert.Equal(t, float32(10.0), alpFloatMultiplier(2, 1)) // 100/10 = 10 + assert.Equal(t, float32(1.0), alpFloatMultiplier(2, 2)) // 100/100 = 1 + assert.Equal(t, float32(1e10), alpFloatMultiplier(10, 0)) + + assert.Equal(t, float64(1.0), alpDoubleMultiplier(0, 0)) + assert.Equal(t, float64(100.0), alpDoubleMultiplier(2, 0)) + assert.Equal(t, float64(10.0), alpDoubleMultiplier(2, 1)) + assert.Equal(t, float64(1e18), alpDoubleMultiplier(18, 0)) +} +func TestAlpBitWidth(t *testing.T) { + assert.Equal(t, 0, alpBitWidth32(0)) + assert.Equal(t, 1, alpBitWidth32(1)) + assert.Equal(t, 2, alpBitWidth32(2)) + assert.Equal(t, 2, alpBitWidth32(3)) + assert.Equal(t, 8, alpBitWidth32(255)) + assert.Equal(t, 9, alpBitWidth32(256)) + assert.Equal(t, 32, alpBitWidth32(math.MaxUint32)) + + assert.Equal(t, 0, alpBitWidth64(0)) + assert.Equal(t, 1, alpBitWidth64(1)) + assert.Equal(t, 8, alpBitWidth64(255)) + assert.Equal(t, 64, alpBitWidth64(math.MaxUint64)) +} + +func TestAlpPackUnpackBits32(t *testing.T) { + tests := []struct { + name string + values []uint32 + bitWidth int + }{ + {"1-bit values", []uint32{0, 1, 0, 1, 1, 0, 1, 0}, 1}, + {"4-bit values", []uint32{0, 1, 5, 10, 15, 3, 7, 12}, 4}, + {"8-bit values", []uint32{0, 128, 255, 1, 42, 100, 200, 50}, 8}, + {"16-bit values", []uint32{0, 1000, 50000, 65535, 32768, 1, 256, 2048}, 16}, + {"32-bit values", []uint32{0, math.MaxUint32, 12345, 67890}, 32}, + {"zero bit width", []uint32{0, 0, 0, 0}, 0}, + {"single value", []uint32{42}, 6}, + {"mixed bit widths", []uint32{0, 1, 2, 3, 4, 5, 6, 7}, 3}, + {"large count", func() []uint32 { + v := make([]uint32, 1024) + for i := range v { + v[i] = uint32(i % 256) + } + return v + }(), 8}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + packed := alpPackBits32(tt.values, len(tt.values), tt.bitWidth) + unpacked := alpUnpackBits32(packed, len(tt.values), tt.bitWidth) + assert.Equal(t, tt.values, unpacked, "pack/unpack round-trip failed") + }) + } +} + +func TestAlpPackUnpackBits64(t *testing.T) { + tests := []struct { + name string + values []uint64 + bitWidth int + }{ + {"1-bit values", []uint64{0, 1, 0, 1, 1, 0, 1, 0}, 1}, + {"8-bit values", []uint64{0, 128, 255, 1, 42, 100, 200, 50}, 8}, + {"32-bit values", []uint64{0, math.MaxUint32, 12345, 67890}, 32}, + {"48-bit values", []uint64{0, 1 << 47, 12345678901234, 1}, 48}, + {"64-bit values", []uint64{0, math.MaxUint64, 12345, 67890}, 64}, + {"zero bit width", []uint64{0, 0, 0, 0}, 0}, + {"single value", []uint64{42}, 6}, + {"large count", func() []uint64 { + v := make([]uint64, 1024) + for i := range v { + v[i] = uint64(i * 1000) + } + return v + }(), 20}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + packed := alpPackBits64(tt.values, len(tt.values), tt.bitWidth) + unpacked := alpUnpackBits64(packed, len(tt.values), tt.bitWidth) + assert.Equal(t, tt.values, unpacked, "pack/unpack round-trip failed") + }) + } +} + +func TestAlpPackBits32KnownOutput(t *testing.T) { + values := []uint32{0, 1, 2, 3} + packed := alpPackBits32(values, 4, 2) + require.Len(t, packed, 1) + assert.Equal(t, byte(0xE4), packed[0]) +} + +func TestAlpPackBits64KnownOutput(t *testing.T) { + values := []uint64{0, 1, 2, 3} + packed := alpPackBits64(values, 4, 2) + require.Len(t, packed, 1) + assert.Equal(t, byte(0xE4), packed[0]) +} +func TestAlpFindBestFloat32Params(t *testing.T) { + values := []float32{1.23, 4.56, 7.89, 10.11, 12.13} + params := alpFindBestFloat32Params(values, 0, len(values)) + assert.Equal(t, 0, params.numExceptions, "clean monetary data should have 0 exceptions") + assert.Equal(t, 2, params.exponent) + assert.Equal(t, 0, params.factor) +} + +func TestAlpFindBestFloat64Params(t *testing.T) { + values := []float64{1.23, 4.56, 7.89, 10.11, 12.13} + params := alpFindBestFloat64Params(values, 0, len(values)) + assert.Equal(t, 0, params.numExceptions, "clean monetary data should have 0 exceptions") + assert.Equal(t, 2, params.exponent) + assert.Equal(t, 0, params.factor) +} + +func TestAlpFindBestParamsWithExceptions(t *testing.T) { + values := []float32{ + 1.23, 4.56, float32(math.Inf(1)), 7.89, + math.Float32frombits(alpNegZeroFloat32Bits), 10.11, + } + params := alpFindBestFloat32Params(values, 0, len(values)) + assert.Equal(t, 2, params.numExceptions) +} + +func TestAlpFindBestFloat32ParamsWithPresets(t *testing.T) { + values := []float32{1.23, 4.56, 7.89} + presets := [][2]int{{2, 0}, {3, 0}, {1, 0}} + params := alpFindBestFloat32ParamsWithPresets(values, 0, len(values), presets) + assert.Equal(t, 0, params.numExceptions) + assert.Equal(t, 2, params.exponent) +} + +func TestAlpFindBestFloat64ParamsWithPresets(t *testing.T) { + values := []float64{1.23, 4.56, 7.89} + presets := [][2]int{{2, 0}, {3, 0}, {1, 0}} + params := alpFindBestFloat64ParamsWithPresets(values, 0, len(values), presets) + assert.Equal(t, 0, params.numExceptions) + assert.Equal(t, 2, params.exponent) +} + +func TestAlpFindBestParamsIntegerData(t *testing.T) { + values := []float32{1.0, 2.0, 3.0, 100.0, 200.0} + params := alpFindBestFloat32Params(values, 0, len(values)) + assert.Equal(t, 0, params.numExceptions) + assert.Equal(t, 0, params.exponent) + assert.Equal(t, 0, params.factor) +} + +func TestAlpFindBestParamsAllExceptions(t *testing.T) { + inf := float32(math.Inf(1)) + ninf := float32(math.Inf(-1)) + negZero := math.Float32frombits(alpNegZeroFloat32Bits) + values := []float32{inf, ninf, negZero, inf} + params := alpFindBestFloat32Params(values, 0, len(values)) + assert.Equal(t, 4, params.numExceptions, "all values are exceptions") +} +func newFloat32Column() *schema.Column { + return schema.NewColumn(schema.NewFloat32Node("float32", parquet.Repetitions.Required, -1), 0, 0) +} + +func newFloat64Column() *schema.Column { + return schema.NewColumn(schema.NewFloat64Node("float64", parquet.Repetitions.Required, -1), 0, 0) +} + +func alpFloat32RoundTrip(t *testing.T, values []float32) { + t.Helper() + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + out := make([]float32, len(values)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out[i]), + "index %d: got %v (bits %08x), want %v (bits %08x)", + i, out[i], math.Float32bits(out[i]), v, math.Float32bits(v)) + } +} + +func alpFloat64RoundTrip(t *testing.T, values []float64) { + t.Helper() + col := newFloat64Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float64](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float64]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + out := make([]float64, len(values)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float64bits(v), math.Float64bits(out[i]), + "index %d: got %v (bits %016x), want %v (bits %016x)", + i, out[i], math.Float64bits(out[i]), v, math.Float64bits(v)) + } +} + +func TestAlpFloat32MonetaryRoundTrip(t *testing.T) { + values := make([]float32, 2000) + for i := range values { + values[i] = float32(i) * 0.01 + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64MonetaryRoundTrip(t *testing.T) { + values := make([]float64, 2000) + for i := range values { + values[i] = float64(i) * 0.01 + } + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32IntegerRoundTrip(t *testing.T) { + values := make([]float32, 1500) + for i := range values { + values[i] = float32(i) + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64IntegerRoundTrip(t *testing.T) { + values := make([]float64, 1500) + for i := range values { + values[i] = float64(i) + } + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32ExceptionsRoundTrip(t *testing.T) { + negZero := math.Float32frombits(alpNegZeroFloat32Bits) + inf := float32(math.Inf(1)) + ninf := float32(math.Inf(-1)) + + values := []float32{ + 1.23, 4.56, inf, 7.89, negZero, 10.11, + ninf, 12.13, 14.15, 16.17, + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64ExceptionsRoundTrip(t *testing.T) { + negZero := math.Float64frombits(alpNegZeroFloat64Bits) + inf := math.Inf(1) + ninf := math.Inf(-1) + + values := []float64{ + 1.23, 4.56, inf, 7.89, negZero, 10.11, + ninf, 12.13, 14.15, 16.17, + } + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32AllExceptionsRoundTrip(t *testing.T) { + negZero := math.Float32frombits(alpNegZeroFloat32Bits) + inf := float32(math.Inf(1)) + ninf := float32(math.Inf(-1)) + + values := []float32{inf, ninf, negZero, inf, ninf, negZero, inf, ninf} + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64AllExceptionsRoundTrip(t *testing.T) { + negZero := math.Float64frombits(alpNegZeroFloat64Bits) + inf := math.Inf(1) + ninf := math.Inf(-1) + + values := []float64{inf, ninf, negZero, inf, ninf, negZero, inf, ninf} + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32SingleValueRoundTrip(t *testing.T) { + alpFloat32RoundTrip(t, []float32{42.0}) + alpFloat32RoundTrip(t, []float32{0.0}) + alpFloat32RoundTrip(t, []float32{-1.23}) +} + +func TestAlpFloat64SingleValueRoundTrip(t *testing.T) { + alpFloat64RoundTrip(t, []float64{42.0}) + alpFloat64RoundTrip(t, []float64{0.0}) + alpFloat64RoundTrip(t, []float64{-1.23}) +} + +func TestAlpFloat32PartialVectorRoundTrip(t *testing.T) { + for _, count := range []int{1, 2, 3, 7, 100, 500, 1023} { + t.Run(fmt.Sprintf("count_%d", count), func(t *testing.T) { + values := make([]float32, count) + for i := range values { + values[i] = float32(i) * 0.1 + } + alpFloat32RoundTrip(t, values) + }) + } +} + +func TestAlpFloat64PartialVectorRoundTrip(t *testing.T) { + for _, count := range []int{1, 2, 3, 7, 100, 500, 1023} { + t.Run(fmt.Sprintf("count_%d", count), func(t *testing.T) { + values := make([]float64, count) + for i := range values { + values[i] = float64(i) * 0.1 + } + alpFloat64RoundTrip(t, values) + }) + } +} + +func TestAlpFloat32ExactVectorSizeRoundTrip(t *testing.T) { + values := make([]float32, 1024) + for i := range values { + values[i] = float32(i) * 0.01 + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64ExactVectorSizeRoundTrip(t *testing.T) { + values := make([]float64, 1024) + for i := range values { + values[i] = float64(i) * 0.01 + } + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32MultipleVectorsRoundTrip(t *testing.T) { + for _, count := range []int{1025, 2048, 3000, 5000, 10000} { + t.Run(fmt.Sprintf("count_%d", count), func(t *testing.T) { + values := make([]float32, count) + for i := range values { + values[i] = float32(i) * 0.01 + } + alpFloat32RoundTrip(t, values) + }) + } +} + +func TestAlpFloat64MultipleVectorsRoundTrip(t *testing.T) { + for _, count := range []int{1025, 2048, 3000, 5000, 10000} { + t.Run(fmt.Sprintf("count_%d", count), func(t *testing.T) { + values := make([]float64, count) + for i := range values { + values[i] = float64(i) * 0.01 + } + alpFloat64RoundTrip(t, values) + }) + } +} + +func TestAlpFloat32ConstantValueRoundTrip(t *testing.T) { + values := make([]float32, 1500) + for i := range values { + values[i] = 3.14 + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64ConstantValueRoundTrip(t *testing.T) { + values := make([]float64, 1500) + for i := range values { + values[i] = 3.14 + } + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32ZeroValuesRoundTrip(t *testing.T) { + values := make([]float32, 1024) + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64ZeroValuesRoundTrip(t *testing.T) { + values := make([]float64, 1024) + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32NegativeValuesRoundTrip(t *testing.T) { + values := make([]float32, 1500) + for i := range values { + values[i] = -float32(i) * 0.01 + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64NegativeValuesRoundTrip(t *testing.T) { + values := make([]float64, 1500) + for i := range values { + values[i] = -float64(i) * 0.01 + } + alpFloat64RoundTrip(t, values) +} + +func TestAlpFloat32ScientificRoundTrip(t *testing.T) { + values := make([]float32, 2000) + for i := range values { + values[i] = float32(i)*0.001 + 273.15 // Kelvin temperatures + } + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64ScientificRoundTrip(t *testing.T) { + values := make([]float64, 2000) + for i := range values { + values[i] = float64(i)*0.001 + 273.15 + } + alpFloat64RoundTrip(t, values) +} +func TestAlpFloat32ProgressiveDecoding(t *testing.T) { + values := make([]float32, 3000) + for i := range values { + values[i] = float32(i) * 0.01 + } + + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + chunkSize := 100 + totalRead := 0 + for totalRead < len(values) { + remaining := len(values) - totalRead + toRead := chunkSize + if toRead > remaining { + toRead = remaining + } + out := make([]float32, toRead) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, toRead, n) + + for i := 0; i < n; i++ { + assert.Equal(t, math.Float32bits(values[totalRead+i]), math.Float32bits(out[i]), + "index %d: got %v, want %v", totalRead+i, out[i], values[totalRead+i]) + } + totalRead += n + } + assert.Equal(t, len(values), totalRead) +} + +func TestAlpFloat64ProgressiveDecoding(t *testing.T) { + values := make([]float64, 3000) + for i := range values { + values[i] = float64(i) * 0.01 + } + + col := newFloat64Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float64](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float64]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + chunkSize := 100 + totalRead := 0 + for totalRead < len(values) { + remaining := len(values) - totalRead + toRead := chunkSize + if toRead > remaining { + toRead = remaining + } + out := make([]float64, toRead) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, toRead, n) + + for i := 0; i < n; i++ { + assert.Equal(t, math.Float64bits(values[totalRead+i]), math.Float64bits(out[i]), + "index %d: got %v, want %v", totalRead+i, out[i], values[totalRead+i]) + } + totalRead += n + } + assert.Equal(t, len(values), totalRead) +} +func TestAlpFloat32Discard(t *testing.T) { + values := make([]float32, 3000) + for i := range values { + values[i] = float32(i) * 0.01 + } + + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + discarded, err := dec.Discard(1500) + require.NoError(t, err) + assert.Equal(t, 1500, discarded) + + out := make([]float32, 1500) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, 1500, n) + + for i := 0; i < n; i++ { + assert.Equal(t, math.Float32bits(values[1500+i]), math.Float32bits(out[i]), + "index %d: got %v, want %v", 1500+i, out[i], values[1500+i]) + } +} + +func TestAlpFloat64Discard(t *testing.T) { + values := make([]float64, 3000) + for i := range values { + values[i] = float64(i) * 0.01 + } + + col := newFloat64Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float64](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float64]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + discarded, err := dec.Discard(1500) + require.NoError(t, err) + assert.Equal(t, 1500, discarded) + + out := make([]float64, 1500) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, 1500, n) + + for i := 0; i < n; i++ { + assert.Equal(t, math.Float64bits(values[1500+i]), math.Float64bits(out[i]), + "index %d: got %v, want %v", 1500+i, out[i], values[1500+i]) + } +} + +func TestAlpDiscardMoreThanAvailable(t *testing.T) { + values := []float32{1.0, 2.0, 3.0} + + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + discarded, err := dec.Discard(100) + require.NoError(t, err) + assert.Equal(t, 3, discarded, "should only discard available values") +} +func TestAlpFloat32MultiplePages(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + for page := 0; page < 5; page++ { + values := make([]float32, 500) + for i := range values { + values[i] = float32(page*500+i) * 0.01 + } + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + out := make([]float32, len(values)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out[i]), + "page %d, index %d: mismatch", page, i) + } + + buf.Release() + } +} + +func TestAlpFloat64MultiplePages(t *testing.T) { + col := newFloat64Column() + mem := memory.DefaultAllocator + + dec := &alpDecoder[float64]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + for page := 0; page < 5; page++ { + values := make([]float64, 500) + for i := range values { + values[i] = float64(page*500+i) * 0.01 + } + + enc := newAlpEncoder[float64](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + out := make([]float64, len(values)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float64bits(v), math.Float64bits(out[i]), + "page %d, index %d: mismatch", page, i) + } + + buf.Release() + } +} +func TestAlpEncoderReset(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + + values1 := []float32{1.23, 4.56, 7.89} + enc.Put(values1) + buf1, err := enc.FlushValues() + require.NoError(t, err) + defer buf1.Release() + + enc.Reset() + values2 := []float32{10.11, 12.13} + enc.Put(values2) + buf2, err := enc.FlushValues() + require.NoError(t, err) + defer buf2.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values1), buf1.Bytes()) + require.NoError(t, err) + out1 := make([]float32, len(values1)) + n, err := dec.Decode(out1) + require.NoError(t, err) + assert.Equal(t, len(values1), n) + for i, v := range values1 { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out1[i])) + } + + err = dec.SetData(len(values2), buf2.Bytes()) + require.NoError(t, err) + out2 := make([]float32, len(values2)) + n, err = dec.Decode(out2) + require.NoError(t, err) + assert.Equal(t, len(values2), n) + for i, v := range values2 { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out2[i])) + } +} + +func TestAlpEncoderType(t *testing.T) { + col32 := newFloat32Column() + col64 := newFloat64Column() + mem := memory.DefaultAllocator + + enc32 := newAlpEncoder[float32](format.Encoding_ALP, col32, mem) + assert.Equal(t, parquet.Types.Float, enc32.Type()) + + enc64 := newAlpEncoder[float64](format.Encoding_ALP, col64, mem) + assert.Equal(t, parquet.Types.Double, enc64.Type()) +} + +func TestAlpDecoderType(t *testing.T) { + col32 := newFloat32Column() + col64 := newFloat64Column() + + dec32 := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col32)} + assert.Equal(t, parquet.Types.Float, dec32.Type()) + + dec64 := &alpDecoder[float64]{decoder: newDecoderBase(format.Encoding_ALP, col64)} + assert.Equal(t, parquet.Types.Double, dec64.Type()) +} + +func TestAlpEncoderPutSpaced(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + values := []float32{1.0, 2.0, 0.0, 4.0, 0.0, 6.0, 7.0, 0.0} + validBits := []byte{0b01101011} // bits 0,1,3,5,6 set + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.PutSpaced(values, validBits, 0) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + expected := []float32{1.0, 2.0, 4.0, 6.0, 7.0} + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(expected), buf.Bytes()) + require.NoError(t, err) + + out := make([]float32, len(expected)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(expected), n) + + for i, v := range expected { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out[i]), + "index %d: got %v, want %v", i, out[i], v) + } +} +func TestAlpHeaderFormat(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + values := make([]float32, 100) + for i := range values { + values[i] = float32(i) + } + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + data := buf.Bytes() + require.True(t, len(data) >= alpHeaderSize, "encoded data should be at least header size") + + assert.Equal(t, byte(alpVersion), data[0], "version") + assert.Equal(t, byte(alpCompressionMode), data[1], "compression mode") + assert.Equal(t, byte(alpIntegerEncodingFOR), data[2], "integer encoding") + assert.Equal(t, byte(alpDefaultLogVector), data[3], "log vector size") + + numElements := binary.LittleEndian.Uint32(data[4:8]) + assert.Equal(t, uint32(100), numElements, "element count") +} + +func TestAlpHeaderFormatMultipleVectors(t *testing.T) { + col := newFloat64Column() + mem := memory.DefaultAllocator + + values := make([]float64, 3000) + for i := range values { + values[i] = float64(i) * 0.01 + } + + enc := newAlpEncoder[float64](format.Encoding_ALP, col, mem) + enc.Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + data := buf.Bytes() + require.True(t, len(data) >= alpHeaderSize) + + numElements := binary.LittleEndian.Uint32(data[4:8]) + assert.Equal(t, uint32(3000), numElements) + + numVectors := (3000 + 1023) / 1024 + assert.Equal(t, 3, numVectors) + + offsetArraySize := numVectors * 4 + require.True(t, len(data) >= alpHeaderSize+offsetArraySize) + + firstOffset := binary.LittleEndian.Uint32(data[alpHeaderSize:]) + assert.Equal(t, uint32(offsetArraySize), firstOffset, + "first vector offset should equal offset array size") + + for i := 1; i < numVectors; i++ { + offset := binary.LittleEndian.Uint32(data[alpHeaderSize+i*4:]) + prevOffset := binary.LittleEndian.Uint32(data[alpHeaderSize+(i-1)*4:]) + assert.True(t, offset > prevOffset, + "offset[%d]=%d should be greater than offset[%d]=%d", i, offset, i-1, prevOffset) + } +} +func TestAlpDecoderInvalidData(t *testing.T) { + col := newFloat32Column() + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + err := dec.SetData(10, []byte{1, 2, 3}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "too short") +} + +func TestAlpDecoderInvalidVersion(t *testing.T) { + col := newFloat32Column() + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + data := make([]byte, 100) + data[0] = 99 // invalid version + data[1] = 0 + data[2] = 0 + data[3] = 10 + binary.LittleEndian.PutUint32(data[4:], 1) + + err := dec.SetData(1, data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "version") +} + +func TestAlpDecoderInvalidLogVectorSize(t *testing.T) { + col := newFloat32Column() + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + data := make([]byte, 100) + data[0] = alpVersion + data[1] = alpCompressionMode + data[2] = alpIntegerEncodingFOR + data[3] = 20 // invalid: > alpMaxLogVectorSize + binary.LittleEndian.PutUint32(data[4:], 1) + + err := dec.SetData(1, data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "log vector size") +} + +func TestAlpDecoderInvalidCompressionMode(t *testing.T) { + col := newFloat32Column() + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + data := make([]byte, 100) + data[0] = alpVersion + data[1] = 99 // invalid compression mode + data[2] = alpIntegerEncodingFOR + data[3] = alpDefaultLogVector + binary.LittleEndian.PutUint32(data[4:], 1) + + err := dec.SetData(1, data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "compression mode") +} + +func TestAlpDecoderInvalidIntegerEncoding(t *testing.T) { + col := newFloat32Column() + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + data := make([]byte, 100) + data[0] = alpVersion + data[1] = alpCompressionMode + data[2] = 99 // invalid integer encoding + data[3] = alpDefaultLogVector + binary.LittleEndian.PutUint32(data[4:], 1) + + err := dec.SetData(1, data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "integer encoding") +} + +func TestAlpDecoderTruncatedOffsetArray(t *testing.T) { + col := newFloat32Column() + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + + data := make([]byte, alpHeaderSize+2) // not enough for 4-byte offset + data[0] = alpVersion + data[1] = alpCompressionMode + data[2] = alpIntegerEncodingFOR + data[3] = alpDefaultLogVector + binary.LittleEndian.PutUint32(data[4:], 1024) + + err := dec.SetData(1024, data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "too short") +} +func TestAlpPresetCacheBuilds(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + + values := make([]float32, 9000) + for i := range values { + values[i] = float32(i) * 0.01 + } + enc.Put(values) + + assert.NotNil(t, enc.cachedPresets, "preset cache should be built after 8 vectors") + assert.True(t, len(enc.cachedPresets) > 0, "should have at least one preset") + assert.True(t, len(enc.cachedPresets) <= alpMaxPresetCombinations, "should not exceed max presets") + + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(values), buf.Bytes()) + require.NoError(t, err) + + out := make([]float32, len(values)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out[i]), + "index %d: got %v, want %v", i, out[i], v) + } +} +func TestAlpEstimatedDataEncodedSize(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + + assert.Equal(t, int64(0), enc.EstimatedDataEncodedSize()) + + values := make([]float32, 100) + for i := range values { + values[i] = float32(i) * 0.01 + } + enc.Put(values) + size := enc.EstimatedDataEncodedSize() + assert.True(t, size > 0, "estimated size should be positive after Put") +} +func TestAlpFloat32IncrementalPut(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + enc := newAlpEncoder[float32](format.Encoding_ALP, col, mem) + allValues := make([]float32, 2500) + for i := range allValues { + allValues[i] = float32(i) * 0.01 + } + + offset := 0 + chunks := []int{100, 200, 500, 724, 976} // total = 2500 + for _, chunk := range chunks { + enc.Put(allValues[offset : offset+chunk]) + offset += chunk + } + + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := &alpDecoder[float32]{decoder: newDecoderBase(format.Encoding_ALP, col)} + err = dec.SetData(len(allValues), buf.Bytes()) + require.NoError(t, err) + + out := make([]float32, len(allValues)) + n, err := dec.Decode(out) + require.NoError(t, err) + assert.Equal(t, len(allValues), n) + + for i, v := range allValues { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out[i]), + "index %d: got %v, want %v", i, out[i], v) + } +} +func TestAlpFloat32MixedDataPatterns(t *testing.T) { + var values []float32 + + for i := 0; i < 500; i++ { + values = append(values, float32(i)*0.01) + } + for i := 0; i < 500; i++ { + values = append(values, float32(i)) + } + for i := 0; i < 500; i++ { + values = append(values, float32(i)*0.001+100.0) + } + values = append(values, float32(math.Inf(1))) + values = append(values, float32(math.Inf(-1))) + values = append(values, math.Float32frombits(alpNegZeroFloat32Bits)) + for i := 0; i < 497; i++ { + values = append(values, float32(i)*0.1) + } + + alpFloat32RoundTrip(t, values) +} + +func TestAlpFloat64MixedDataPatterns(t *testing.T) { + var values []float64 + + for i := 0; i < 500; i++ { + values = append(values, float64(i)*0.01) + } + for i := 0; i < 500; i++ { + values = append(values, float64(i)) + } + for i := 0; i < 500; i++ { + values = append(values, float64(i)*0.001+100.0) + } + values = append(values, math.Inf(1)) + values = append(values, math.Inf(-1)) + values = append(values, math.Float64frombits(alpNegZeroFloat64Bits)) + for i := 0; i < 497; i++ { + values = append(values, float64(i)*0.1) + } + + alpFloat64RoundTrip(t, values) +} +func TestAlpFloat32ViaPublicAPI(t *testing.T) { + col := newFloat32Column() + mem := memory.DefaultAllocator + + values := make([]float32, 2000) + for i := range values { + values[i] = float32(i) * 0.01 + } + + enc := NewEncoder(parquet.Types.Float, parquet.Encodings.ALP, false, col, mem) + assert.Equal(t, parquet.Encodings.ALP, enc.Encoding()) + assert.Equal(t, parquet.Types.Float, enc.Type()) + + enc.(Float32Encoder).Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := NewDecoder(parquet.Types.Float, parquet.Encodings.ALP, col, mem) + assert.Equal(t, parquet.Encodings.ALP, dec.Encoding()) + assert.Equal(t, parquet.Types.Float, dec.Type()) + + dec.SetData(len(values), buf.Bytes()) + out := make([]float32, len(values)) + n, err := dec.(Float32Decoder).Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float32bits(v), math.Float32bits(out[i]), + "index %d: mismatch", i) + } +} + +func TestAlpFloat64ViaPublicAPI(t *testing.T) { + col := newFloat64Column() + mem := memory.DefaultAllocator + + values := make([]float64, 2000) + for i := range values { + values[i] = float64(i) * 0.01 + } + + enc := NewEncoder(parquet.Types.Double, parquet.Encodings.ALP, false, col, mem) + assert.Equal(t, parquet.Encodings.ALP, enc.Encoding()) + assert.Equal(t, parquet.Types.Double, enc.Type()) + + enc.(Float64Encoder).Put(values) + buf, err := enc.FlushValues() + require.NoError(t, err) + defer buf.Release() + + dec := NewDecoder(parquet.Types.Double, parquet.Encodings.ALP, col, mem) + assert.Equal(t, parquet.Encodings.ALP, dec.Encoding()) + assert.Equal(t, parquet.Types.Double, dec.Type()) + + dec.SetData(len(values), buf.Bytes()) + out := make([]float64, len(values)) + n, err := dec.(Float64Decoder).Decode(out) + require.NoError(t, err) + assert.Equal(t, len(values), n) + + for i, v := range values { + assert.Equal(t, math.Float64bits(v), math.Float64bits(out[i]), + "index %d: mismatch", i) + } +} + +func TestAlpPanicsForNonFloatTypes(t *testing.T) { + col := schema.NewColumn(schema.NewInt32Node("int32", parquet.Repetitions.Required, -1), 0, 0) + mem := memory.DefaultAllocator + + assert.Panics(t, func() { + NewEncoder(parquet.Types.Int32, parquet.Encodings.ALP, false, col, mem) + }, "ALP should panic for non-float encoder") + + assert.Panics(t, func() { + NewDecoder(parquet.Types.Int32, parquet.Encodings.ALP, col, mem) + }, "ALP should panic for non-float decoder") +} + diff --git a/parquet/internal/encoding/encoding_test.go b/parquet/internal/encoding/encoding_test.go index 93e830dc8..1b0299af4 100644 --- a/parquet/internal/encoding/encoding_test.go +++ b/parquet/internal/encoding/encoding_test.go @@ -444,6 +444,17 @@ func (b *BaseEncodingTestSuite) TestByteStreamSplitRoundTrip() { } } +func (b *BaseEncodingTestSuite) TestAlpRoundTrip() { + b.initData(10000, 1) + + switch b.typ { + case reflect.TypeOf(float32(0)), reflect.TypeOf(float64(0)): + b.checkRoundTrip(parquet.Encodings.ALP) + default: + b.Panics(func() { b.checkRoundTrip(parquet.Encodings.ALP) }) + } +} + func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() { exec := func(vals, repeats int, validBitsOffset int64, nullProb float64) { b.Run(fmt.Sprintf("%d vals %d repeats %d offset %0.3f null", vals, repeats, validBitsOffset, 1-nullProb), func() { diff --git a/parquet/internal/encoding/typed_encoder.go b/parquet/internal/encoding/typed_encoder.go index 1cba18da5..2ec882d5d 100644 --- a/parquet/internal/encoding/typed_encoder.go +++ b/parquet/internal/encoding/typed_encoder.go @@ -123,6 +123,8 @@ func (floatEncodingTraits[T]) Encoder(e format.Encoding, useDict bool, descr *sc encoder: newEncoderBase(e, descr, mem), }, } + case format.Encoding_ALP: + return newAlpEncoder[T](e, descr, mem) default: panic("unimplemented encoding for float types: " + e.String()) } @@ -243,6 +245,8 @@ func (floatDecoderTraits[T]) Decoder(e parquet.Encoding, descr *schema.Column, u return &PlainDecoder[T]{decoder: newDecoderBase(format.Encoding(e), descr)} case parquet.Encodings.ByteStreamSplit: return &ByteStreamSplitDecoder[T]{decoder: newDecoderBase(format.Encoding(e), descr)} + case parquet.Encodings.ALP: + return &alpDecoder[T]{decoder: newDecoderBase(format.Encoding(e), descr)} default: panic("unimplemented encoding for float types: " + e.String()) } diff --git a/parquet/internal/gen-go/parquet/parquet.go b/parquet/internal/gen-go/parquet/parquet.go index db24d3907..dfcac2612 100644 --- a/parquet/internal/gen-go/parquet/parquet.go +++ b/parquet/internal/gen-go/parquet/parquet.go @@ -357,6 +357,7 @@ const ( Encoding_DELTA_BYTE_ARRAY Encoding = 7 Encoding_RLE_DICTIONARY Encoding = 8 Encoding_BYTE_STREAM_SPLIT Encoding = 9 + Encoding_ALP Encoding = 10 ) func (p Encoding) String() string { @@ -370,6 +371,7 @@ func (p Encoding) String() string { case Encoding_DELTA_BYTE_ARRAY: return "DELTA_BYTE_ARRAY" case Encoding_RLE_DICTIONARY: return "RLE_DICTIONARY" case Encoding_BYTE_STREAM_SPLIT: return "BYTE_STREAM_SPLIT" + case Encoding_ALP: return "ALP" } return "" } @@ -385,6 +387,7 @@ func EncodingFromString(s string) (Encoding, error) { case "DELTA_BYTE_ARRAY": return Encoding_DELTA_BYTE_ARRAY, nil case "RLE_DICTIONARY": return Encoding_RLE_DICTIONARY, nil case "BYTE_STREAM_SPLIT": return Encoding_BYTE_STREAM_SPLIT, nil + case "ALP": return Encoding_ALP, nil } return Encoding(0), fmt.Errorf("not a valid Encoding string") } diff --git a/parquet/types.go b/parquet/types.go index f8c904c85..7e5e1eb63 100644 --- a/parquet/types.go +++ b/parquet/types.go @@ -297,6 +297,7 @@ var ( DeltaBinaryPacked Encoding DeltaLengthByteArray Encoding ByteStreamSplit Encoding + ALP Encoding }{ Plain: Encoding(format.Encoding_PLAIN), PlainDict: Encoding(format.Encoding_PLAIN_DICTIONARY), @@ -307,6 +308,7 @@ var ( DeltaBinaryPacked: Encoding(format.Encoding_DELTA_BINARY_PACKED), DeltaLengthByteArray: Encoding(format.Encoding_DELTA_LENGTH_BYTE_ARRAY), ByteStreamSplit: Encoding(format.Encoding_BYTE_STREAM_SPLIT), + ALP: Encoding(format.Encoding_ALP), } // ColumnOrders contains constants for the Column Ordering fields