Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/DataFrame.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ __I/O__
* @D.readTsv :: FilePath -> IO DataFrame@
* @D.writeCsv :: FilePath -> DataFrame -> IO ()@
* @D.readParquet :: FilePath -> IO DataFrame@
* @D.readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame@

__Exploration__

Expand Down Expand Up @@ -252,7 +253,14 @@ import DataFrame.IO.CSV as CSV (
writeCsv,
writeSeparated,
)
import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles)
import DataFrame.IO.Parquet as Parquet (
ParquetReadOptions (..),
defaultParquetReadOptions,
readParquet,
readParquetFiles,
readParquetFilesWithOpts,
readParquetWithOpts,
)
import DataFrame.IO.Unstable.CSV as UnstableCSV (
fastReadCsvUnstable,
fastReadTsvUnstable,
Expand Down
155 changes: 114 additions & 41 deletions src/DataFrame/IO/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

module DataFrame.IO.Parquet where

import Control.Exception (throw)
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
Expand All @@ -14,15 +15,19 @@ import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Data.Word
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import DataFrame.Internal.Expression (Expr, getColumns)
import qualified DataFrame.Operations.Core as DI
import DataFrame.Operations.Merge ()
import qualified DataFrame.Operations.Subset as DS
import System.FilePath.Glob (glob)

import DataFrame.IO.Parquet.Dictionary
Expand All @@ -35,6 +40,21 @@ import System.Directory (doesDirectoryExist)
import qualified Data.Vector.Unboxed as VU
import System.FilePath ((</>))

data ParquetReadOptions = ParquetReadOptions
{ selectedColumns :: Maybe [T.Text]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet is useful for predicate as read options. so as you're reading a file (or series of files) you can do some filtering.

let x = F.col @(Maybe Text) "x"
let opts = defaultParquetReadOpts
  { predicate = x ./= Nothing .&& (x .<= F.lit (Just 100)) }
D.readParquetWithOpts

This will be extremely useful for reading globs.

, predicate :: Maybe (Expr Bool)
, rowRange :: Maybe (Int, Int)
}
deriving (Eq, Show)

defaultParquetReadOptions :: ParquetReadOptions
defaultParquetReadOptions =
ParquetReadOptions
{ selectedColumns = Nothing
, predicate = Nothing
, rowRange = Nothing
}

{- | Read a parquet file from path and load it into a dataframe.

==== __Example__
Expand All @@ -43,10 +63,38 @@ ghci> D.readParquet ".\/data\/mtcars.parquet"
@
-}
readParquet :: FilePath -> IO DataFrame
readParquet path = do
readParquet = readParquetWithOpts defaultParquetReadOptions

readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetWithOpts opts path = do
fileMetadata <- readMetadataFromPath path
let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata)
let columnNames = map fst columnPaths
let leafNames = map (last . T.splitOn ".") columnNames
let availableSelectedColumns = L.nub leafNames
let predicateColumns = maybe [] (L.nub . getColumns) (predicate opts)
let selectedColumnsForRead = case selectedColumns opts of
Nothing -> Nothing
Just selected -> Just (L.nub (selected ++ predicateColumns))
let selectedColumnSet = S.fromList <$> selectedColumnsForRead
let shouldReadColumn colName _ =
case selectedColumnSet of
Nothing -> True
Just selected -> colName `S.member` selected

case selectedColumnsForRead of
Nothing -> pure ()
Just requested ->
let missing = requested L.\\ availableSelectedColumns
in unless
(L.null missing)
( throw
( ColumnNotFoundException
(T.pack $ show missing)
"readParquetWithOpts"
availableSelectedColumns
)
)

colMap <- newIORef (M.empty :: M.Map T.Text DI.Column)
lTypeMap <- newIORef (M.empty :: M.Map T.Text LogicalType)
Expand Down Expand Up @@ -77,53 +125,59 @@ readParquet path = do
then T.pack $ "col_" ++ show colIdx
else T.pack $ last colPath

let colDataPageOffset = columnDataPageOffset metadata
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
let colStart =
if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
then colDictionaryPageOffset
else colDataPageOffset
let colLength = columnTotalCompressedSize metadata

let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)

pages <- readAllPages (columnCodec metadata) columnBytes

let maybeTypeLength =
if columnType metadata == PFIXED_LEN_BYTE_ARRAY
then getTypeLength colPath
else Nothing

let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata))

let schemaTail = drop 1 (schema fileMetadata)
let colPath = columnPathInSchema (columnMetaData colChunk)
let (maxDef, maxRep) = levelsForPath schemaTail colPath
let lType = logicalType (schemaTail !! colIdx)
column <-
processColumnPages
(maxDef, maxRep)
pages
(columnType metadata)
primaryEncoding
maybeTypeLength
lType

modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
modifyIORef lTypeMap (M.insert colName lType)
when (shouldReadColumn colName colPath) $ do
let colDataPageOffset = columnDataPageOffset metadata
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
let colStart =
if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
then colDictionaryPageOffset
else colDataPageOffset
let colLength = columnTotalCompressedSize metadata

let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)

pages <- readAllPages (columnCodec metadata) columnBytes

let maybeTypeLength =
if columnType metadata == PFIXED_LEN_BYTE_ARRAY
then getTypeLength colPath
else Nothing

let primaryEncoding = maybe EPLAIN fst (L.uncons (columnEncodings metadata))

let schemaTail = drop 1 (schema fileMetadata)
let (maxDef, maxRep) = levelsForPath schemaTail colPath
let lType = logicalType (schemaTail !! colIdx)
column <-
processColumnPages
(maxDef, maxRep)
pages
(columnType metadata)
primaryEncoding
maybeTypeLength
lType

modifyIORef colMap (M.insertWith DI.concatColumnsEither colName column)
modifyIORef lTypeMap (M.insert colName lType)

finalColMap <- readIORef colMap
finalLTypeMap <- readIORef lTypeMap
let orderedColumns =
map
( \name -> (name, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name)
( \name ->
( name
, applyLogicalType (finalLTypeMap M.! name) $ finalColMap M.! name
)
)
(filter (`M.member` finalColMap) columnNames)

pure $ DI.fromNamedColumns orderedColumns
pure $ applyReadOptions opts (DI.fromNamedColumns orderedColumns)

readParquetFiles :: FilePath -> IO DataFrame
readParquetFiles path = do
readParquetFiles = readParquetFilesWithOpts defaultParquetReadOptions

readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetFilesWithOpts opts path = do
isDir <- doesDirectoryExist path

let pat = if isDir then path </> "*" else path
Expand All @@ -137,8 +191,27 @@ readParquetFiles path = do
error $
"readParquetFiles: no parquet files found for " ++ path
_ -> do
dfs <- mapM readParquet files
pure (mconcat dfs)
let optsWithoutRowRange = opts{rowRange = Nothing}
dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files
pure (applyRowRange opts (mconcat dfs))

applyRowRange :: ParquetReadOptions -> DataFrame -> DataFrame
applyRowRange opts df =
maybe df (`DS.range` df) (rowRange opts)

applySelectedColumns :: ParquetReadOptions -> DataFrame -> DataFrame
applySelectedColumns opts df =
maybe df (`DS.select` df) (selectedColumns opts)

applyPredicate :: ParquetReadOptions -> DataFrame -> DataFrame
applyPredicate opts df =
maybe df (`DS.filterWhere` df) (predicate opts)

applyReadOptions :: ParquetReadOptions -> DataFrame -> DataFrame
applyReadOptions opts =
applyRowRange opts
. applySelectedColumns opts
. applyPredicate opts

readMetadataFromPath :: FilePath -> IO FileMetadata
readMetadataFromPath path = do
Expand Down Expand Up @@ -310,7 +383,7 @@ processColumnPages (maxDef, maxRep) pages pType _ maybeTypeLength lType = do
L.foldl' (\l r -> fromRight (error "concat failed") (DI.concatColumns l r)) c cs

applyLogicalType :: LogicalType -> DI.Column -> DI.Column
applyLogicalType (TimestampType isUTC unit) col =
applyLogicalType (TimestampType _ unit) col =
fromRight col $
DI.mapColumn
(microsecondsToUTCTime . (* (1_000_000 `div` unitDivisor unit)))
Expand Down
118 changes: 118 additions & 0 deletions tests/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module Parquet where

import Assertions (assertExpectException)
import qualified DataFrame as D
import qualified DataFrame.Functions as F

Expand Down Expand Up @@ -186,6 +187,117 @@ allTypesDictionary =
(unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet"))
)

selectedColumnsWithOpts :: Test
selectedColumnsWithOpts =
TestCase
( assertEqual
"selectedColumnsWithOpts"
(D.select ["id", "bool_col"] allTypes)
( unsafePerformIO
( D.readParquetWithOpts
(D.defaultParquetReadOptions{D.selectedColumns = Just ["id", "bool_col"]})
"./tests/data/alltypes_plain.parquet"
)
)
)

rowRangeWithOpts :: Test
rowRangeWithOpts =
TestCase
( assertEqual
"rowRangeWithOpts"
(3, 11)
( unsafePerformIO
( D.dimensions
<$> D.readParquetWithOpts
(D.defaultParquetReadOptions{D.rowRange = Just (2, 5)})
"./tests/data/alltypes_plain.parquet"
)
)
)

predicateWithOpts :: Test
predicateWithOpts =
TestCase
( assertEqual
"predicateWithOpts"
(D.fromNamedColumns [("id", D.fromList [6 :: Int32, 7])])
( unsafePerformIO
( D.readParquetWithOpts
( D.defaultParquetReadOptions
{ D.selectedColumns = Just ["id"]
, D.predicate =
Just
( F.geq
(F.col @Int32 "id")
(F.lit (6 :: Int32))
)
}
)
"./tests/data/alltypes_plain.parquet"
)
)
)

predicateUsesNonSelectedColumnWithOpts :: Test
predicateUsesNonSelectedColumnWithOpts =
TestCase
( assertEqual
"predicateUsesNonSelectedColumnWithOpts"
(D.fromNamedColumns [("bool_col", D.fromList [True, False])])
( unsafePerformIO
( D.readParquetWithOpts
( D.defaultParquetReadOptions
{ D.selectedColumns = Just ["bool_col"]
, D.predicate =
Just
( F.geq
(F.col @Int32 "id")
(F.lit (6 :: Int32))
)
}
)
"./tests/data/alltypes_plain.parquet"
)
)
)

predicateWithOptsAcrossFiles :: Test
predicateWithOptsAcrossFiles =
TestCase
( assertEqual
"predicateWithOptsAcrossFiles"
(4, 1)
( unsafePerformIO
( D.dimensions
<$> D.readParquetFilesWithOpts
( D.defaultParquetReadOptions
{ D.selectedColumns = Just ["id"]
, D.predicate =
Just
( F.geq
(F.col @Int32 "id")
(F.lit (6 :: Int32))
)
}
)
"./tests/data/alltypes_plain*.parquet"
)
)
)

missingSelectedColumnWithOpts :: Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This was a good implementation.

missingSelectedColumnWithOpts =
TestCase
( assertExpectException
"missingSelectedColumnWithOpts"
"Column not found"
( D.readParquetWithOpts
(D.defaultParquetReadOptions{D.selectedColumns = Just ["does_not_exist"]})
"./tests/data/alltypes_plain.parquet"
)
)

transactions :: D.DataFrame
transactions =
D.fromNamedColumns
Expand Down Expand Up @@ -819,6 +931,12 @@ tests =
[ allTypesPlain
, allTypesPlainSnappy
, allTypesDictionary
, selectedColumnsWithOpts
, rowRangeWithOpts
, predicateWithOpts
, predicateUsesNonSelectedColumnWithOpts
, predicateWithOptsAcrossFiles
, missingSelectedColumnWithOpts
, mtCars
, allTypesTinyPagesLastFew
, allTypesTinyPagesDimensions
Expand Down