diff --git a/src/DataFrame.hs b/src/DataFrame.hs index 943a508..088fc7a 100644 --- a/src/DataFrame.hs +++ b/src/DataFrame.hs @@ -105,6 +105,7 @@ __I/O__ * @D.readTsv :: FilePath -> IO DataFrame@ * @D.writeCsv :: FilePath -> DataFrame -> IO ()@ * @D.readParquet :: FilePath -> IO DataFrame@ + * @D.readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame@ __Exploration__ @@ -252,7 +253,14 @@ import DataFrame.IO.CSV as CSV ( writeCsv, writeSeparated, ) -import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles) +import DataFrame.IO.Parquet as Parquet ( + ParquetReadOptions (..), + defaultParquetReadOptions, + readParquet, + readParquetFiles, + readParquetFilesWithOpts, + readParquetWithOpts, + ) import DataFrame.IO.Unstable.CSV as UnstableCSV ( fastReadCsvUnstable, fastReadTsvUnstable, diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 7e667fd..e8353e5 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -6,6 +6,7 @@ module DataFrame.IO.Parquet where +import Control.Exception (throw) import Control.Monad import Data.Bits import qualified Data.ByteString as BSO @@ -14,15 +15,19 @@ import Data.IORef import Data.Int import qualified Data.List as L import qualified Data.Map as M +import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding import Data.Time import Data.Time.Clock.POSIX (posixSecondsToUTCTime) import Data.Word +import DataFrame.Errors (DataFrameException (ColumnNotFoundException)) import qualified DataFrame.Internal.Column as DI import DataFrame.Internal.DataFrame (DataFrame) +import DataFrame.Internal.Expression (Expr, getColumns) import qualified DataFrame.Operations.Core as DI import DataFrame.Operations.Merge () +import qualified DataFrame.Operations.Subset as DS import System.FilePath.Glob (glob) import DataFrame.IO.Parquet.Dictionary @@ -35,6 +40,21 @@ import System.Directory (doesDirectoryExist) import qualified Data.Vector.Unboxed as VU import System.FilePath (()) +data ParquetReadOptions = ParquetReadOptions + { selectedColumns :: Maybe [T.Text] + , predicate :: Maybe (Expr Bool) + , rowRange :: Maybe (Int, Int) + } + deriving (Eq, Show) + +defaultParquetReadOptions :: ParquetReadOptions +defaultParquetReadOptions = + ParquetReadOptions + { selectedColumns = Nothing + , predicate = Nothing + , rowRange = Nothing + } + {- | Read a parquet file from path and load it into a dataframe. ==== __Example__ @@ -43,10 +63,38 @@ ghci> D.readParquet ".\/data\/mtcars.parquet" @ -} readParquet :: FilePath -> IO DataFrame -readParquet path = do +readParquet = readParquetWithOpts defaultParquetReadOptions + +readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame +readParquetWithOpts opts path = do fileMetadata <- readMetadataFromPath path let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata) let columnNames = map fst columnPaths + let leafNames = map (last . T.splitOn ".") columnNames + let availableSelectedColumns = L.nub leafNames + let predicateColumns = maybe [] (L.nub . getColumns) (predicate opts) + let selectedColumnsForRead = case selectedColumns opts of + Nothing -> Nothing + Just selected -> Just (L.nub (selected ++ predicateColumns)) + let selectedColumnSet = S.fromList <$> selectedColumnsForRead + let shouldReadColumn colName _ = + case selectedColumnSet of + Nothing -> True + Just selected -> colName `S.member` selected + + case selectedColumnsForRead of + Nothing -> pure () + Just requested -> + let missing = requested L.\\ availableSelectedColumns + in unless + (L.null missing) + ( throw + ( ColumnNotFoundException + (T.pack $ show missing) + "readParquetWithOpts" + availableSelectedColumns + ) + ) colMap <- newIORef (M.empty :: M.Map T.Text DI.Column) lTypeMap <- newIORef (M.empty :: M.Map T.Text LogicalType) @@ -77,53 +125,59 @@ readParquet path = do then T.pack $ "col_" ++ show colIdx else T.pack $ last colPath - let colDataPageOffset = columnDataPageOffset metadata - let colDictionaryPageOffset = columnDictionaryPageOffset metadata - let colStart = - if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset - then colDictionaryPageOffset - else colDataPageOffset - let colLength = columnTotalCompressedSize metadata - - let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) - - pages <- readAllPages (columnCodec metadata) columnBytes - - let maybeTypeLength = - if columnType metadata == PFIXED_LEN_BYTE_ARRAY - then getTypeLength colPath - else Nothing - - let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata)) - - let schemaTail = drop 1 (schema fileMetadata) - let colPath = columnPathInSchema (columnMetaData colChunk) - let (maxDef, maxRep) = levelsForPath schemaTail colPath - let lType = logicalType (schemaTail !! colIdx) - column <- - processColumnPages - (maxDef, maxRep) - pages - (columnType metadata) - primaryEncoding - maybeTypeLength - lType - - modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column) - modifyIORef lTypeMap (M.insert colName lType) + when (shouldReadColumn colName colPath) $ do + let colDataPageOffset = columnDataPageOffset metadata + let colDictionaryPageOffset = columnDictionaryPageOffset metadata + let colStart = + if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset + then colDictionaryPageOffset + else colDataPageOffset + let colLength = columnTotalCompressedSize metadata + + let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) + + pages <- readAllPages (columnCodec metadata) columnBytes + + let maybeTypeLength = + if columnType metadata == PFIXED_LEN_BYTE_ARRAY + then getTypeLength colPath + else Nothing + + let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata)) + + let schemaTail = drop 1 (schema fileMetadata) + let (maxDef, maxRep) = levelsForPath schemaTail colPath + let lType = logicalType (schemaTail !! colIdx) + column <- + processColumnPages + (maxDef, maxRep) + pages + (columnType metadata) + primaryEncoding + maybeTypeLength + lType + + modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column) + modifyIORef lTypeMap (M.insert colName lType) finalColMap <- readIORef colMap finalLTypeMap <- readIORef lTypeMap let orderedColumns = map - ( \name -> (name, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name) + ( \name -> + ( name + , applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name + ) ) (filter (`M.member` finalColMap) columnNames) - pure $ DI.fromNamedColumns orderedColumns + pure $ applyReadOptions opts (DI.fromNamedColumns orderedColumns) readParquetFiles :: FilePath -> IO DataFrame -readParquetFiles path = do +readParquetFiles = readParquetFilesWithOpts defaultParquetReadOptions + +readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame +readParquetFilesWithOpts opts path = do isDir <- doesDirectoryExist path let pat = if isDir then path "*" else path @@ -137,8 +191,27 @@ readParquetFiles path = do error $ "readParquetFiles: no parquet files found for " ++ path _ -> do - dfs <- mapM readParquet files - pure (mconcat dfs) + let optsWithoutRowRange = opts{rowRange = Nothing} + dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files + pure (applyRowRange opts (mconcat dfs)) + +applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame +applyRowRange opts df = + maybe df (`DS.range` df) (rowRange opts) + +applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame +applySelectedColumns opts df = + maybe df (`DS.select` df) (selectedColumns opts) + +applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame +applyPredicate opts df = + maybe df (`DS.filterWhere` df) (predicate opts) + +applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame +applyReadOptions opts = + applyRowRange opts + . applySelectedColumns opts + . applyPredicate opts readMetadataFromPath :: FilePath -> IO FileMetadata readMetadataFromPath path = do @@ -310,7 +383,7 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs applyLogicalType :: LogicalType -> DI.Column -> DI.Column -applyLogicalType (TimestampType isUTC unit) col = +applyLogicalType (TimestampType _ unit) col = fromRight col $ DI.mapColumn (microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit))) diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 77abd95..82cf506 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -3,6 +3,7 @@ module Parquet where +import Assertions (assertExpectException) import qualified DataFrame as D import qualified DataFrame.Functions as F @@ -186,6 +187,117 @@ allTypesDictionary = (unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet")) ) +selectedColumnsWithOpts :: Test +selectedColumnsWithOpts = + TestCase + ( assertEqual + "selectedColumnsWithOpts" + (D.select ["id", "bool_col"] allTypes) + ( unsafePerformIO + ( D.readParquetWithOpts + (D.defaultParquetReadOptions{D.selectedColumns = Just ["id", "bool_col"]}) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +rowRangeWithOpts :: Test +rowRangeWithOpts = + TestCase + ( assertEqual + "rowRangeWithOpts" + (3, 11) + ( unsafePerformIO + ( D.dimensions + <$> D.readParquetWithOpts + (D.defaultParquetReadOptions{D.rowRange = Just (2, 5)}) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +predicateWithOpts :: Test +predicateWithOpts = + TestCase + ( assertEqual + "predicateWithOpts" + (D.fromNamedColumns [("id", D.fromList [6 :: Int32, 7])]) + ( unsafePerformIO + ( D.readParquetWithOpts + ( D.defaultParquetReadOptions + { D.selectedColumns = Just ["id"] + , D.predicate = + Just + ( F.geq + (F.col @Int32 "id") + (F.lit (6 :: Int32)) + ) + } + ) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +predicateUsesNonSelectedColumnWithOpts :: Test +predicateUsesNonSelectedColumnWithOpts = + TestCase + ( assertEqual + "predicateUsesNonSelectedColumnWithOpts" + (D.fromNamedColumns [("bool_col", D.fromList [True, False])]) + ( unsafePerformIO + ( D.readParquetWithOpts + ( D.defaultParquetReadOptions + { D.selectedColumns = Just ["bool_col"] + , D.predicate = + Just + ( F.geq + (F.col @Int32 "id") + (F.lit (6 :: Int32)) + ) + } + ) + "./tests/data/alltypes_plain.parquet" + ) + ) + ) + +predicateWithOptsAcrossFiles :: Test +predicateWithOptsAcrossFiles = + TestCase + ( assertEqual + "predicateWithOptsAcrossFiles" + (4, 1) + ( unsafePerformIO + ( D.dimensions + <$> D.readParquetFilesWithOpts + ( D.defaultParquetReadOptions + { D.selectedColumns = Just ["id"] + , D.predicate = + Just + ( F.geq + (F.col @Int32 "id") + (F.lit (6 :: Int32)) + ) + } + ) + "./tests/data/alltypes_plain*.parquet" + ) + ) + ) + +missingSelectedColumnWithOpts :: Test +missingSelectedColumnWithOpts = + TestCase + ( assertExpectException + "missingSelectedColumnWithOpts" + "Column not found" + ( D.readParquetWithOpts + (D.defaultParquetReadOptions{D.selectedColumns = Just ["does_not_exist"]}) + "./tests/data/alltypes_plain.parquet" + ) + ) + transactions :: D.DataFrame transactions = D.fromNamedColumns @@ -819,6 +931,12 @@ tests = [ allTypesPlain , allTypesPlainSnappy , allTypesDictionary + , selectedColumnsWithOpts + , rowRangeWithOpts + , predicateWithOpts + , predicateUsesNonSelectedColumnWithOpts + , predicateWithOptsAcrossFiles + , missingSelectedColumnWithOpts , mtCars , allTypesTinyPagesLastFew , allTypesTinyPagesDimensions