From d5951131b72e7270152655e946ec66a5156bc916 Mon Sep 17 00:00:00 2001 From: Ali Sao Date: Wed, 25 Feb 2026 15:47:52 -0800 Subject: [PATCH 1/3] feat(parquet): add read options for projection, timestamp coercion, and row range Problem: Parquet reads were all-or-nothing. Users could not subset columns at read-time, control timestamp-to-day conversion, or subset rows while loading. This issue also required preserving current behavior for existing callers. Solution: introduce ParquetReadOptions (selectedColumns, timestampPolicy, rowRange) plus defaultParquetReadOptions. Add readParquetWithOpts/readParquetFilesWithOpts and keep readParquet/readParquetFiles as default-option wrappers. Wire selectedColumns into decode-time filtering with fail-fast ColumnNotFoundException for missing requested columns. Wire timestampPolicy with PreserveTimestampPrecision and CoerceTimestampToDay behaviors, including fallback coercion for already-decoded UTCTime columns. Wire rowRange through the reader and apply global rowRange semantics for readParquetFilesWithOpts after concatenation. Tradeoffs and rationale: chose an options record instead of multiple specialized APIs to keep extension points coherent and avoid API sprawl. Kept legacy conversion wrappers/helpers (applyLogicalType and UTC helpers) to reduce compatibility risk for existing/internal call paths. read-time projection improves performance by skipping unselected chunk decode; rowRange currently uses post-read slicing semantics (start inclusive, end exclusive) for correctness and consistency with existing range behavior. Verification: add focused Parquet tests for selectedColumns, rowRange, timestampPolicy coercion, and missing selected column errors; run full suite successfully via cabal test (all passing). --- src/DataFrame.hs | 11 +- src/DataFrame/IO/Parquet.hs | 194 +++++++++++++++++++++++++++--------- tests/Parquet.hs | 72 +++++++++++++ 3 files changed, 228 insertions(+), 49 deletions(-) diff --git a/src/DataFrame.hs b/src/DataFrame.hs index 943a508..75b20ec 100644 --- a/src/DataFrame.hs +++ b/src/DataFrame.hs @@ -105,6 +105,7 @@ __I/O__ * @D.readTsv :: FilePath -> IO DataFrame@ * @D.writeCsv :: FilePath -> DataFrame -> IO ()@ * @D.readParquet :: FilePath -> IO DataFrame@ + * @D.readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame@ __Exploration__ @@ -252,7 +253,15 @@ import DataFrame.IO.CSV as CSV ( writeCsv, writeSeparated, ) -import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles) +import DataFrame.IO.Parquet as Parquet ( + ParquetReadOptions (..), + ParquetTimestampPolicy (..), + defaultParquetReadOptions, + readParquet, + readParquetFiles, + readParquetFilesWithOpts, + readParquetWithOpts, + ) import DataFrame.IO.Unstable.CSV as UnstableCSV ( fastReadCsvUnstable, fastReadTsvUnstable, diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 7e667fd..440d4d6 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -7,6 +7,7 @@ module DataFrame.IO.Parquet where import Control.Monad +import Control.Exception (throw) import Data.Bits import qualified Data.ByteString as BSO import Data.Either @@ -14,15 +15,18 @@ import Data.IORef import Data.Int import qualified Data.List as L import qualified Data.Map as M +import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding import Data.Time import Data.Time.Clock.POSIX (posixSecondsToUTCTime) import Data.Word +import DataFrame.Errors (DataFrameException (ColumnNotFoundException)) import qualified DataFrame.Internal.Column as DI import DataFrame.Internal.DataFrame (DataFrame) import qualified DataFrame.Operations.Core as DI import DataFrame.Operations.Merge () +import qualified DataFrame.Operations.Subset as DS import System.FilePath.Glob (glob) import DataFrame.IO.Parquet.Dictionary @@ -35,6 +39,26 @@ import System.Directory (doesDirectoryExist) import qualified Data.Vector.Unboxed as VU import System.FilePath (()) +data ParquetTimestampPolicy + = PreserveTimestampPrecision + | CoerceTimestampToDay + deriving (Eq, Show) + +data ParquetReadOptions = ParquetReadOptions + { selectedColumns :: Maybe [T.Text] + , timestampPolicy :: ParquetTimestampPolicy + , rowRange :: Maybe (Int, Int) + } + deriving (Eq, Show) + +defaultParquetReadOptions :: ParquetReadOptions +defaultParquetReadOptions = + ParquetReadOptions + { selectedColumns = Nothing + , timestampPolicy = PreserveTimestampPrecision + , rowRange = Nothing + } + {- | Read a parquet file from path and load it into a dataframe. ==== __Example__ @@ -43,10 +67,36 @@ ghci> D.readParquet ".\/data\/mtcars.parquet" @ -} readParquet :: FilePath -> IO DataFrame -readParquet path = do +readParquet = readParquetWithOpts defaultParquetReadOptions + +readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame +readParquetWithOpts opts path = do fileMetadata <- readMetadataFromPath path let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata) let columnNames = map fst columnPaths + let leafNames = map (last . T.splitOn ".") columnNames + let availableSelectedColumns = L.nub (columnNames ++ leafNames) + let selectedColumnSet = S.fromList <$> selectedColumns opts + let shouldReadColumn colName colPath = + case selectedColumnSet of + Nothing -> True + Just selected -> + let fullPath = T.intercalate "." (map T.pack colPath) + in colName `S.member` selected || fullPath `S.member` selected + + case selectedColumns opts of + Nothing -> pure () + Just requested -> + let missing = requested L.\\ availableSelectedColumns + in unless + (L.null missing) + ( throw + ( ColumnNotFoundException + (T.pack $ show missing) + "readParquetWithOpts" + availableSelectedColumns + ) + ) colMap <- newIORef (M.empty :: M.Map T.Text DI.Column) lTypeMap <- newIORef (M.empty :: M.Map T.Text LogicalType) @@ -77,53 +127,59 @@ readParquet path = do then T.pack $ "col_" ++ show colIdx else T.pack $ last colPath - let colDataPageOffset = columnDataPageOffset metadata - let colDictionaryPageOffset = columnDictionaryPageOffset metadata - let colStart = - if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset - then colDictionaryPageOffset - else colDataPageOffset - let colLength = columnTotalCompressedSize metadata - - let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) - - pages <- readAllPages (columnCodec metadata) columnBytes - - let maybeTypeLength = - if columnType metadata == PFIXED_LEN_BYTE_ARRAY - then getTypeLength colPath - else Nothing - - let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata)) - - let schemaTail = drop 1 (schema fileMetadata) - let colPath = columnPathInSchema (columnMetaData colChunk) - let (maxDef, maxRep) = levelsForPath schemaTail colPath - let lType = logicalType (schemaTail !! colIdx) - column <- - processColumnPages - (maxDef, maxRep) - pages - (columnType metadata) - primaryEncoding - maybeTypeLength - lType - - modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column) - modifyIORef lTypeMap (M.insert colName lType) + when (shouldReadColumn colName colPath) $ do + let colDataPageOffset = columnDataPageOffset metadata + let colDictionaryPageOffset = columnDictionaryPageOffset metadata + let colStart = + if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset + then colDictionaryPageOffset + else colDataPageOffset + let colLength = columnTotalCompressedSize metadata + + let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) + + pages <- readAllPages (columnCodec metadata) columnBytes + + let maybeTypeLength = + if columnType metadata == PFIXED_LEN_BYTE_ARRAY + then getTypeLength colPath + else Nothing + + let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata)) + + let schemaTail = drop 1 (schema fileMetadata) + let (maxDef, maxRep) = levelsForPath schemaTail colPath + let lType = logicalType (schemaTail !! colIdx) + column <- + processColumnPages + (maxDef, maxRep) + pages + (columnType metadata) + primaryEncoding + maybeTypeLength + lType + + modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column) + modifyIORef lTypeMap (M.insert colName lType) finalColMap <- readIORef colMap finalLTypeMap <- readIORef lTypeMap let orderedColumns = map - ( \name -> (name, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name) + ( \name -> + ( name + , applyLogicalTypeWithOptions opts (finalLTypeMap M.! name) $ finalColMap M.! name + ) ) (filter (`M.member` finalColMap) columnNames) - pure $ DI.fromNamedColumns orderedColumns + pure $ applyRowRange opts (DI.fromNamedColumns orderedColumns) readParquetFiles :: FilePath -> IO DataFrame -readParquetFiles path = do +readParquetFiles = readParquetFilesWithOpts defaultParquetReadOptions + +readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame +readParquetFilesWithOpts opts path = do isDir <- doesDirectoryExist path let pat = if isDir then path "*" else path @@ -137,8 +193,14 @@ readParquetFiles path = do error $ "readParquetFiles: no parquet files found for " ++ path _ -> do - dfs <- mapM readParquet files - pure (mconcat dfs) + let optsWithoutRowRange = opts{rowRange = Nothing} + dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files + pure (applyRowRange opts (mconcat dfs)) + +applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame +applyRowRange opts df = case rowRange opts of + Nothing -> df + Just (start, end) -> DS.range (start, end) df readMetadataFromPath :: FilePath -> IO FileMetadata readMetadataFromPath path = do @@ -309,13 +371,12 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do pure $ L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs -applyLogicalType :: LogicalType -> DI.Column -> DI.Column -applyLogicalType (TimestampType isUTC unit) col = - fromRight col $ - DI.mapColumn - (microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit))) - col -applyLogicalType (DecimalType precision scale) col +applyLogicalTypeWithOptions :: ParquetReadOptions -> LogicalType -> DI.Column -> DI.Column +applyLogicalTypeWithOptions opts (TimestampType _ unit) col = + case timestampPolicy opts of + PreserveTimestampPrecision -> asUTCTime unit col + CoerceTimestampToDay -> asDay unit col +applyLogicalTypeWithOptions _ (DecimalType precision scale) col | precision <= 9 = case DI.toVector @Int32 @VU.Vector col of Right xs -> DI.fromUnboxedVector $ @@ -327,7 +388,44 @@ applyLogicalType (DecimalType precision scale) col VU.map (\raw -> fromIntegral @Int64 @Double raw / 10 ^ scale) xs Left _ -> col | otherwise = col -applyLogicalType _ col = col +applyLogicalTypeWithOptions opts _ col = + case timestampPolicy opts of + CoerceTimestampToDay -> coerceUTCTimeColumnToDay col + PreserveTimestampPrecision -> col + +applyLogicalType :: LogicalType -> DI.Column -> DI.Column +applyLogicalType = applyLogicalTypeWithOptions defaultParquetReadOptions + +asUTCTime :: TimeUnit -> DI.Column -> DI.Column +asUTCTime unit col = case DI.mapColumn (timestampValueToUTCTime unit) col of + Right out -> out + Left _ -> case DI.mapColumn (fmap (timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe UTCTime) col of + Right out -> out + Left _ -> col + +asDay :: TimeUnit -> DI.Column -> DI.Column +asDay unit col = case DI.mapColumn (utctDay . timestampValueToUTCTime unit) col of + Right out -> out + Left _ -> case DI.mapColumn (fmap (utctDay . timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe Day) col of + Right out -> out + Left _ -> coerceUTCTimeColumnToDay col + +coerceUTCTimeColumnToDay :: DI.Column -> DI.Column +coerceUTCTimeColumnToDay col = case DI.mapColumn utctDay col of + Right out -> out + Left _ -> case DI.mapColumn (fmap utctDay :: Maybe UTCTime -> Maybe Day) col of + Right out -> out + Left _ -> col + +timestampValueToUTCTime :: TimeUnit -> Int64 -> UTCTime +timestampValueToUTCTime MILLISECONDS value = + posixSecondsToUTCTime (fromIntegral value / 1_000) +timestampValueToUTCTime MICROSECONDS value = + posixSecondsToUTCTime (fromIntegral value / 1_000_000) +timestampValueToUTCTime NANOSECONDS value = + posixSecondsToUTCTime (fromIntegral value / 1_000_000_000) +timestampValueToUTCTime TIME_UNIT_UNKNOWN value = + posixSecondsToUTCTime (fromIntegral value) microsecondsToUTCTime :: Int64 -> UTCTime microsecondsToUTCTime us = diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 77abd95..a2e6006 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -3,6 +3,7 @@ module Parquet where +import Assertions (assertExpectException) import qualified DataFrame as D import qualified DataFrame.Functions as F @@ -186,6 +187,73 @@ allTypesDictionary = (unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet")) ) +selectedColumnsWithOpts :: Test +selectedColumnsWithOpts = + TestCase + ( assertEqual + "selectedColumnsWithOpts" + (D.select ["id", "bool_col"] allTypes) + ( unsafePerformIO + ( D.readParquetWithOpts + (D.defaultParquetReadOptions{D.selectedColumns = Just ["id", "bool_col"]}) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +rowRangeWithOpts :: Test +rowRangeWithOpts = + TestCase + ( assertEqual + "rowRangeWithOpts" + (D.range (2, 5) allTypes) + ( unsafePerformIO + ( D.readParquetWithOpts + (D.defaultParquetReadOptions{D.rowRange = Just (2, 5)}) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +timestampPolicyCoerceToDayWithOpts :: Test +timestampPolicyCoerceToDayWithOpts = + TestCase + ( assertEqual + "timestampPolicyCoerceToDayWithOpts" + ( D.fromNamedColumns + [ ( "timestamp_col" + , D.fromList + [ fromGregorian 2009 3 1 :: Day + , fromGregorian 2009 3 1 + ] + ) + ] + ) + ( unsafePerformIO + ( D.readParquetWithOpts + ( D.defaultParquetReadOptions + { D.selectedColumns = Just ["timestamp_col"] + , D.timestampPolicy = D.CoerceTimestampToDay + , D.rowRange = Just (0, 2) + } + ) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +missingSelectedColumnWithOpts :: Test +missingSelectedColumnWithOpts = + TestCase + ( assertExpectException + "missingSelectedColumnWithOpts" + "Column not found" + ( D.readParquetWithOpts + (D.defaultParquetReadOptions{D.selectedColumns = Just ["does_not_exist"]}) + "./tests/data/alltypes_plain.parquet" + ) + ) + transactions :: D.DataFrame transactions = D.fromNamedColumns @@ -819,6 +887,10 @@ tests = [ allTypesPlain , allTypesPlainSnappy , allTypesDictionary + , selectedColumnsWithOpts + , rowRangeWithOpts + , timestampPolicyCoerceToDayWithOpts + , missingSelectedColumnWithOpts , mtCars , allTypesTinyPagesLastFew , allTypesTinyPagesDimensions From d66b0646b97d4d9d2bbcdcd24c1cf48dd8c8d73f Mon Sep 17 00:00:00 2001 From: Ali Sao Date: Wed, 25 Feb 2026 16:47:16 -0800 Subject: [PATCH 2/3] chore(parquet): format reader and Parquet tests Apply formatter-driven layout updates in Parquet read-options code and related tests. No behavior change; this commit is formatting-only after lint/format checks. --- src/DataFrame/IO/Parquet.hs | 13 +++++++++---- tests/Parquet.hs | 7 ++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 440d4d6..6efc5e0 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -6,8 +6,8 @@ module DataFrame.IO.Parquet where -import Control.Monad import Control.Exception (throw) +import Control.Monad import Data.Bits import qualified Data.ByteString as BSO import Data.Either @@ -371,7 +371,8 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do pure $ L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs -applyLogicalTypeWithOptions :: ParquetReadOptions -> LogicalType -> DI.Column -> DI.Column +applyLogicalTypeWithOptions :: + ParquetReadOptions -> LogicalType -> DI.Column -> DI.Column applyLogicalTypeWithOptions opts (TimestampType _ unit) col = case timestampPolicy opts of PreserveTimestampPrecision -> asUTCTime unit col @@ -399,14 +400,18 @@ applyLogicalType = applyLogicalTypeWithOptions defaultParquetReadOptions asUTCTime :: TimeUnit -> DI.Column -> DI.Column asUTCTime unit col = case DI.mapColumn (timestampValueToUTCTime unit) col of Right out -> out - Left _ -> case DI.mapColumn (fmap (timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe UTCTime) col of + Left _ -> case DI.mapColumn + (fmap (timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe UTCTime) + col of Right out -> out Left _ -> col asDay :: TimeUnit -> DI.Column -> DI.Column asDay unit col = case DI.mapColumn (utctDay . timestampValueToUTCTime unit) col of Right out -> out - Left _ -> case DI.mapColumn (fmap (utctDay . timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe Day) col of + Left _ -> case DI.mapColumn + (fmap (utctDay . timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe Day) + col of Right out -> out Left _ -> coerceUTCTimeColumnToDay col diff --git a/tests/Parquet.hs b/tests/Parquet.hs index a2e6006..1680f03 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -221,12 +221,13 @@ timestampPolicyCoerceToDayWithOpts = ( assertEqual "timestampPolicyCoerceToDayWithOpts" ( D.fromNamedColumns - [ ( "timestamp_col" - , D.fromList + [ + ( "timestamp_col" + , D.fromList [ fromGregorian 2009 3 1 :: Day , fromGregorian 2009 3 1 ] - ) + ) ] ) ( unsafePerformIO From ecda390f1a5d8b7ba3d7d6b98d4476cbb760eeec Mon Sep 17 00:00:00 2001 From: Ali Sao Date: Thu, 26 Feb 2026 13:54:49 -0800 Subject: [PATCH 3/3] add to with default - apply parquet read options in order: predicate filtering, column projection, then row range - auto-include predicate-referenced columns during decode when is set, then project back to requested columns - restrict selected-column matching to leaf names only (drop full-path nested matching) - remove and revert timestamp conversion to default behavior - update row-range helper implementation style in - revise parquet option tests: make row-range assertion non-circular and add predicate-focused cases --- src/DataFrame.hs | 1 - src/DataFrame/IO/Parquet.hs | 106 +++++++++++++----------------------- tests/Parquet.hs | 83 +++++++++++++++++++++------- 3 files changed, 102 insertions(+), 88 deletions(-) diff --git a/src/DataFrame.hs b/src/DataFrame.hs index 75b20ec..088fc7a 100644 --- a/src/DataFrame.hs +++ b/src/DataFrame.hs @@ -255,7 +255,6 @@ import DataFrame.IO.CSV as CSV ( ) import DataFrame.IO.Parquet as Parquet ( ParquetReadOptions (..), - ParquetTimestampPolicy (..), defaultParquetReadOptions, readParquet, readParquetFiles, diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 6efc5e0..e8353e5 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -24,6 +24,7 @@ import Data.Word import DataFrame.Errors (DataFrameException (ColumnNotFoundException)) import qualified DataFrame.Internal.Column as DI import DataFrame.Internal.DataFrame (DataFrame) +import DataFrame.Internal.Expression (Expr, getColumns) import qualified DataFrame.Operations.Core as DI import DataFrame.Operations.Merge () import qualified DataFrame.Operations.Subset as DS @@ -39,14 +40,9 @@ import System.Directory (doesDirectoryExist) import qualified Data.Vector.Unboxed as VU import System.FilePath (()) -data ParquetTimestampPolicy - = PreserveTimestampPrecision - | CoerceTimestampToDay - deriving (Eq, Show) - data ParquetReadOptions = ParquetReadOptions { selectedColumns :: Maybe [T.Text] - , timestampPolicy :: ParquetTimestampPolicy + , predicate :: Maybe (Expr Bool) , rowRange :: Maybe (Int, Int) } deriving (Eq, Show) @@ -55,7 +51,7 @@ defaultParquetReadOptions :: ParquetReadOptions defaultParquetReadOptions = ParquetReadOptions { selectedColumns = Nothing - , timestampPolicy = PreserveTimestampPrecision + , predicate = Nothing , rowRange = Nothing } @@ -75,16 +71,18 @@ readParquetWithOpts opts path = do let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata) let columnNames = map fst columnPaths let leafNames = map (last . T.splitOn ".") columnNames - let availableSelectedColumns = L.nub (columnNames ++ leafNames) - let selectedColumnSet = S.fromList <$> selectedColumns opts - let shouldReadColumn colName colPath = + let availableSelectedColumns = L.nub leafNames + let predicateColumns = maybe [] (L.nub . getColumns) (predicate opts) + let selectedColumnsForRead = case selectedColumns opts of + Nothing -> Nothing + Just selected -> Just (L.nub (selected ++ predicateColumns)) + let selectedColumnSet = S.fromList <$> selectedColumnsForRead + let shouldReadColumn colName _ = case selectedColumnSet of Nothing -> True - Just selected -> - let fullPath = T.intercalate "." (map T.pack colPath) - in colName `S.member` selected || fullPath `S.member` selected + Just selected -> colName `S.member` selected - case selectedColumns opts of + case selectedColumnsForRead of Nothing -> pure () Just requested -> let missing = requested L.\\ availableSelectedColumns @@ -168,12 +166,12 @@ readParquetWithOpts opts path = do map ( \name -> ( name - , applyLogicalTypeWithOptions opts (finalLTypeMap M.! name) $ finalColMap M.! name + , applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name ) ) (filter (`M.member` finalColMap) columnNames) - pure $ applyRowRange opts (DI.fromNamedColumns orderedColumns) + pure $ applyReadOptions opts (DI.fromNamedColumns orderedColumns) readParquetFiles :: FilePath -> IO DataFrame readParquetFiles = readParquetFilesWithOpts defaultParquetReadOptions @@ -198,9 +196,22 @@ readParquetFilesWithOpts opts path = do pure (applyRowRange opts (mconcat dfs)) applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame -applyRowRange opts df = case rowRange opts of - Nothing -> df - Just (start, end) -> DS.range (start, end) df +applyRowRange opts df = + maybe df (`DS.range` df) (rowRange opts) + +applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame +applySelectedColumns opts df = + maybe df (`DS.select` df) (selectedColumns opts) + +applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame +applyPredicate opts df = + maybe df (`DS.filterWhere` df) (predicate opts) + +applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame +applyReadOptions opts = + applyRowRange opts + . applySelectedColumns opts + . applyPredicate opts readMetadataFromPath :: FilePath -> IO FileMetadata readMetadataFromPath path = do @@ -371,13 +382,13 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do pure $ L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs -applyLogicalTypeWithOptions :: - ParquetReadOptions -> LogicalType -> DI.Column -> DI.Column -applyLogicalTypeWithOptions opts (TimestampType _ unit) col = - case timestampPolicy opts of - PreserveTimestampPrecision -> asUTCTime unit col - CoerceTimestampToDay -> asDay unit col -applyLogicalTypeWithOptions _ (DecimalType precision scale) col +applyLogicalType :: LogicalType -> DI.Column -> DI.Column +applyLogicalType (TimestampType _ unit) col = + fromRight col $ + DI.mapColumn + (microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit))) + col +applyLogicalType (DecimalType precision scale) col | precision <= 9 = case DI.toVector @Int32 @VU.Vector col of Right xs -> DI.fromUnboxedVector $ @@ -389,48 +400,7 @@ applyLogicalTypeWithOptions _ (DecimalType precision scale) col VU.map (\raw -> fromIntegral @Int64 @Double raw / 10 ^ scale) xs Left _ -> col | otherwise = col -applyLogicalTypeWithOptions opts _ col = - case timestampPolicy opts of - CoerceTimestampToDay -> coerceUTCTimeColumnToDay col - PreserveTimestampPrecision -> col - -applyLogicalType :: LogicalType -> DI.Column -> DI.Column -applyLogicalType = applyLogicalTypeWithOptions defaultParquetReadOptions - -asUTCTime :: TimeUnit -> DI.Column -> DI.Column -asUTCTime unit col = case DI.mapColumn (timestampValueToUTCTime unit) col of - Right out -> out - Left _ -> case DI.mapColumn - (fmap (timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe UTCTime) - col of - Right out -> out - Left _ -> col - -asDay :: TimeUnit -> DI.Column -> DI.Column -asDay unit col = case DI.mapColumn (utctDay . timestampValueToUTCTime unit) col of - Right out -> out - Left _ -> case DI.mapColumn - (fmap (utctDay . timestampValueToUTCTime unit) :: Maybe Int64 -> Maybe Day) - col of - Right out -> out - Left _ -> coerceUTCTimeColumnToDay col - -coerceUTCTimeColumnToDay :: DI.Column -> DI.Column -coerceUTCTimeColumnToDay col = case DI.mapColumn utctDay col of - Right out -> out - Left _ -> case DI.mapColumn (fmap utctDay :: Maybe UTCTime -> Maybe Day) col of - Right out -> out - Left _ -> col - -timestampValueToUTCTime :: TimeUnit -> Int64 -> UTCTime -timestampValueToUTCTime MILLISECONDS value = - posixSecondsToUTCTime (fromIntegral value / 1_000) -timestampValueToUTCTime MICROSECONDS value = - posixSecondsToUTCTime (fromIntegral value / 1_000_000) -timestampValueToUTCTime NANOSECONDS value = - posixSecondsToUTCTime (fromIntegral value / 1_000_000_000) -timestampValueToUTCTime TIME_UNIT_UNKNOWN value = - posixSecondsToUTCTime (fromIntegral value) +applyLogicalType _ col = col microsecondsToUTCTime :: Int64 -> UTCTime microsecondsToUTCTime us = diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 1680f03..82cf506 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -206,36 +206,55 @@ rowRangeWithOpts = TestCase ( assertEqual "rowRangeWithOpts" - (D.range (2, 5) allTypes) + (3, 11) ( unsafePerformIO - ( D.readParquetWithOpts - (D.defaultParquetReadOptions{D.rowRange = Just (2, 5)}) - "./tests/data/alltypes_plain.parquet" + ( D.dimensions + <$> D.readParquetWithOpts + (D.defaultParquetReadOptions{D.rowRange = Just (2, 5)}) + "./tests/data/alltypes_plain.parquet" ) ) ) -timestampPolicyCoerceToDayWithOpts :: Test -timestampPolicyCoerceToDayWithOpts = +predicateWithOpts :: Test +predicateWithOpts = TestCase ( assertEqual - "timestampPolicyCoerceToDayWithOpts" - ( D.fromNamedColumns - [ - ( "timestamp_col" - , D.fromList - [ fromGregorian 2009 3 1 :: Day - , fromGregorian 2009 3 1 - ] + "predicateWithOpts" + (D.fromNamedColumns [("id", D.fromList [6 :: Int32, 7])]) + ( unsafePerformIO + ( D.readParquetWithOpts + ( D.defaultParquetReadOptions + { D.selectedColumns = Just ["id"] + , D.predicate = + Just + ( F.geq + (F.col @Int32 "id") + (F.lit (6 :: Int32)) + ) + } ) - ] + "./tests/data/alltypes_plain.parquet" + ) ) + ) + +predicateUsesNonSelectedColumnWithOpts :: Test +predicateUsesNonSelectedColumnWithOpts = + TestCase + ( assertEqual + "predicateUsesNonSelectedColumnWithOpts" + (D.fromNamedColumns [("bool_col", D.fromList [True, False])]) ( unsafePerformIO ( D.readParquetWithOpts ( D.defaultParquetReadOptions - { D.selectedColumns = Just ["timestamp_col"] - , D.timestampPolicy = D.CoerceTimestampToDay - , D.rowRange = Just (0, 2) + { D.selectedColumns = Just ["bool_col"] + , D.predicate = + Just + ( F.geq + (F.col @Int32 "id") + (F.lit (6 :: Int32)) + ) } ) "./tests/data/alltypes_plain.parquet" @@ -243,6 +262,30 @@ timestampPolicyCoerceToDayWithOpts = ) ) +predicateWithOptsAcrossFiles :: Test +predicateWithOptsAcrossFiles = + TestCase + ( assertEqual + "predicateWithOptsAcrossFiles" + (4, 1) + ( unsafePerformIO + ( D.dimensions + <$> D.readParquetFilesWithOpts + ( D.defaultParquetReadOptions + { D.selectedColumns = Just ["id"] + , D.predicate = + Just + ( F.geq + (F.col @Int32 "id") + (F.lit (6 :: Int32)) + ) + } + ) + "./tests/data/alltypes_plain*.parquet" + ) + ) + ) + missingSelectedColumnWithOpts :: Test missingSelectedColumnWithOpts = TestCase @@ -890,7 +933,9 @@ tests = , allTypesDictionary , selectedColumnsWithOpts , rowRangeWithOpts - , timestampPolicyCoerceToDayWithOpts + , predicateWithOpts + , predicateUsesNonSelectedColumnWithOpts + , predicateWithOptsAcrossFiles , missingSelectedColumnWithOpts , mtCars , allTypesTinyPagesLastFew