diff --git a/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb b/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb index 6a4796997..dfc1d174d 100644 --- a/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb +++ b/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb @@ -10,7 +10,19 @@ "\n", "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google/grain/blob/main/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb)\n", "\n", - "This tutorial provides an example of how to read data from [Apache Parquet](https://parquet.apache.org/) file, also covers how to process and transform the data with Grain.\n" + "This tutorial provides an example of how to read data from an\n", + "[Apache Parquet](https://parquet.apache.org/) file, and how to process and\n", + "transform the data with Grain.\n", + "\n", + "`grain.experimental.ParquetIterDataset` is a thin wrapper around\n", + "[`pyarrow.parquet.ParquetFile`](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).\n", + "Any extra keyword arguments passed to `ParquetIterDataset(...)` are forwarded to\n", + "`ParquetFile`, so you can use the PyArrow reference to discover supported read\n", + "options.\n", + "\n", + "Internally, Grain streams one row group at a time from each parquet file. If you\n", + "pass multiple paths, Grain lazily interleaves per-file iterators, which is\n", + "useful for sharded datasets that do not support efficient random access." ] }, { @@ -112,6 +124,78 @@ "list(ds)[0]" ] }, + { + "metadata": { + "id": "Hw9pJklva2kk" + }, + "cell_type": "markdown", + "source": [ + "Example of passing some `kwargs` through to `ParquetFile`." + ] + }, + { + "metadata": { + "id": "5aw05FKzazvl" + }, + "cell_type": "code", + "source": [ + "\n", + "ds = grain.experimental.ParquetIterDataset(\n", + " './emails.parquet',\n", + " memory_map=True,\n", + ")\n", + "list(ds)[0]" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": { + "id": "vur8ySS7if_o" + }, + "cell_type": "markdown", + "source": [ + "To read multiple parquet files in parallel, provide a list of paths. Grain will\n", + "automatically use InterleaveIterDataset internally to interleave the files.\n" + ] + }, + { + "metadata": { + "id": "0e3qwHwKimlo" + }, + "cell_type": "code", + "source": [ + "ds = grain.experimental.ParquetIterDataset(\n", + " ['./emails.parquet', './emails.parquet'],\n", + ")" + ], + "outputs": [], + "execution_count": null + }, + { + "metadata": { + "id": "Sc13IncMit1e" + }, + "cell_type": "markdown", + "source": [ + "You can also use `grain.experimental.ThreadPrefetchIterDataset` to overlap\n", + "CPU-side reading with downstream work.\n" + ] + }, + { + "metadata": { + "id": "Ga8LeFN5iux0" + }, + "cell_type": "code", + "source": [ + "ds = grain.experimental.ThreadPrefetchIterDataset(\n", + " ds,\n", + " prefetch_buffer_size=10,\n", + ")" + ], + "outputs": [], + "execution_count": null + }, { "cell_type": "markdown", "metadata": { diff --git a/docs/tutorials/data_sources/parquet_dataset_tutorial.md b/docs/tutorials/data_sources/parquet_dataset_tutorial.md index a1f8853cc..026adc705 100644 --- a/docs/tutorials/data_sources/parquet_dataset_tutorial.md +++ b/docs/tutorials/data_sources/parquet_dataset_tutorial.md @@ -16,7 +16,19 @@ jupyter: [![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google/grain/blob/main/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb) -This tutorial provides an example of how to read data from [Apache Parquet](https://parquet.apache.org/) file, also covers how to process and transform the data with Grain. +This tutorial provides an example of how to read data from an +[Apache Parquet](https://parquet.apache.org/) file, and how to process and +transform the data with Grain. + +`grain.experimental.ParquetIterDataset` is a thin wrapper around +[`pyarrow.parquet.ParquetFile`](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html). +Any extra keyword arguments passed to `ParquetIterDataset(...)` are forwarded to +`ParquetFile`, so you can use the PyArrow reference to discover supported read +options. + +Internally, Grain streams one row group at a time from each parquet file. If you +pass multiple paths, Grain lazily interleaves per-file iterators, which is +useful for sharded datasets that do not support efficient random access. @@ -75,6 +87,35 @@ ds = grain.experimental.ParquetIterDataset('./emails.parquet') list(ds)[0] ``` +Example of passing some kwargs through to ParquetFile. + +```python +ds = grain.experimental.ParquetIterDataset( + './emails.parquet', + memory_map=True, +) +list(ds)[0] +``` + +To read multiple parquet files in parallel, provide a list of paths. Grain will +automatically use InterleaveIterDataset internally to interleave the files. + +``` +ds = grain.experimental.ParquetIterDataset( + ['./emails.parquet', './emails.parquet'], +) +``` + +You can also use `grain.experimental.ThreadPrefetchIterDataset` to overlap +CPU-side reading with downstream work. + +``` +ds = grain.experimental.ThreadPrefetchIterDataset( + ds, + prefetch_buffer_size=10, +) +``` + ## Transform Dataset diff --git a/grain/_src/python/dataset/sources/parquet_dataset.py b/grain/_src/python/dataset/sources/parquet_dataset.py index 4d3b565b5..fcfc3cf84 100644 --- a/grain/_src/python/dataset/sources/parquet_dataset.py +++ b/grain/_src/python/dataset/sources/parquet_dataset.py @@ -86,7 +86,20 @@ def set_state(self, state): class ParquetIterDataset(dataset.IterDataset[T]): - """An IterDataset for a parquet format file.""" + """An IterDataset for parquet files. + + ``ParquetIterDataset`` is a thin wrapper around + :class:`pyarrow.parquet.ParquetFile`. + + Records are streamed row-group by row-group instead of loading the whole file + into memory at once. + + When multiple paths are provided, Grain lazily creates per-file iterators and + interleaves them with a bounded cycle length (default: 16). Then this + dataset can be combined with Grain prefetching utilities such as + ``ThreadPrefetchIterDataset`` or ``device_put`` to overlap CPU-side reading + with downstream work. + """ def __init__( self, @@ -97,9 +110,13 @@ def __init__( Args: path: A path or sequence of paths to parquet format files. If multiple - paths are provided, they are interleaved with at most 16 files read in - parallel. - **read_kwargs: Keyword arguments to pass to pyarrow.parquet.ParquetFile. + paths are provided, they are lazily interleaved with at most 16 active + file iterators at a time. + **read_kwargs: Keyword arguments to pass to + :class:`pyarrow.parquet.ParquetFile`. Refer to the PyArrow `ParquetFile + API + `__ + for the supported options. """ super().__init__() if isinstance(path, (str, bytes)):