Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@
"\n",
"[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google/grain/blob/main/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb)\n",
"\n",
"This tutorial provides an example of how to read data from [Apache Parquet](https://parquet.apache.org/) file, also covers how to process and transform the data with Grain.\n"
"This tutorial provides an example of how to read data from an\n",
"[Apache Parquet](https://parquet.apache.org/) file, and how to process and\n",
"transform the data with Grain.\n",
"\n",
"`grain.experimental.ParquetIterDataset` is a thin wrapper around\n",
"[`pyarrow.parquet.ParquetFile`](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).\n",
"Any extra keyword arguments passed to `ParquetIterDataset(...)` are forwarded to\n",
"`ParquetFile`, so you can use the PyArrow reference to discover supported read\n",
"options.\n",
"\n",
"Internally, Grain streams one row group at a time from each parquet file. If you\n",
"pass multiple paths, Grain lazily interleaves per-file iterators, which is\n",
"useful for sharded datasets that do not support efficient random access."
]
},
{
Expand Down Expand Up @@ -112,6 +124,78 @@
"list(ds)[0]"
]
},
{
"metadata": {
"id": "Hw9pJklva2kk"
},
"cell_type": "markdown",
"source": [
"Example of passing some `kwargs` through to `ParquetFile`."
]
},
{
"metadata": {
"id": "5aw05FKzazvl"
},
"cell_type": "code",
"source": [
"\n",
"ds = grain.experimental.ParquetIterDataset(\n",
" './emails.parquet',\n",
" memory_map=True,\n",
")\n",
"list(ds)[0]"
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "vur8ySS7if_o"
},
"cell_type": "markdown",
"source": [
"To read multiple parquet files in parallel, provide a list of paths. Grain will\n",
"automatically use InterleaveIterDataset internally to interleave the files.\n"
]
},
{
"metadata": {
"id": "0e3qwHwKimlo"
},
"cell_type": "code",
"source": [
"ds = grain.experimental.ParquetIterDataset(\n",
" ['./emails.parquet', './emails.parquet'],\n",
")"
],
"outputs": [],
"execution_count": null
},
{
"metadata": {
"id": "Sc13IncMit1e"
},
"cell_type": "markdown",
"source": [
"You can also use `grain.experimental.ThreadPrefetchIterDataset` to overlap\n",
"CPU-side reading with downstream work.\n"
]
},
{
"metadata": {
"id": "Ga8LeFN5iux0"
},
"cell_type": "code",
"source": [
"ds = grain.experimental.ThreadPrefetchIterDataset(\n",
" ds,\n",
" prefetch_buffer_size=10,\n",
")"
],
"outputs": [],
"execution_count": null
},
{
"cell_type": "markdown",
"metadata": {
Expand Down
43 changes: 42 additions & 1 deletion docs/tutorials/data_sources/parquet_dataset_tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ jupyter:

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/google/grain/blob/main/docs/tutorials/data_sources/parquet_dataset_tutorial.ipynb)

This tutorial provides an example of how to read data from [Apache Parquet](https://parquet.apache.org/) file, also covers how to process and transform the data with Grain.
This tutorial provides an example of how to read data from an
[Apache Parquet](https://parquet.apache.org/) file, and how to process and
transform the data with Grain.

`grain.experimental.ParquetIterDataset` is a thin wrapper around
[`pyarrow.parquet.ParquetFile`](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).
Any extra keyword arguments passed to `ParquetIterDataset(...)` are forwarded to
`ParquetFile`, so you can use the PyArrow reference to discover supported read
options.

Internally, Grain streams one row group at a time from each parquet file. If you
pass multiple paths, Grain lazily interleaves per-file iterators, which is
useful for sharded datasets that do not support efficient random access.

<!-- #endregion -->

Expand Down Expand Up @@ -75,6 +87,35 @@ ds = grain.experimental.ParquetIterDataset('./emails.parquet')
list(ds)[0]
```

Example of passing some kwargs through to ParquetFile.

```python
ds = grain.experimental.ParquetIterDataset(
'./emails.parquet',
memory_map=True,
)
list(ds)[0]
```

To read multiple parquet files in parallel, provide a list of paths. Grain will
automatically use InterleaveIterDataset internally to interleave the files.

```
ds = grain.experimental.ParquetIterDataset(
['./emails.parquet', './emails.parquet'],
)
```

You can also use `grain.experimental.ThreadPrefetchIterDataset` to overlap
CPU-side reading with downstream work.

```
ds = grain.experimental.ThreadPrefetchIterDataset(
ds,
prefetch_buffer_size=10,
)
```

<!-- #region id="BAXS0bgKdrQo" -->
## Transform Dataset
<!-- #endregion -->
Expand Down
25 changes: 21 additions & 4 deletions grain/_src/python/dataset/sources/parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,20 @@ def set_state(self, state):


class ParquetIterDataset(dataset.IterDataset[T]):
"""An IterDataset for a parquet format file."""
"""An IterDataset for parquet files.

``ParquetIterDataset`` is a thin wrapper around
:class:`pyarrow.parquet.ParquetFile`.

Records are streamed row-group by row-group instead of loading the whole file
into memory at once.

When multiple paths are provided, Grain lazily creates per-file iterators and
interleaves them with a bounded cycle length (default: 16). Then this
dataset can be combined with Grain prefetching utilities such as
``ThreadPrefetchIterDataset`` or ``device_put`` to overlap CPU-side reading
with downstream work.
"""

def __init__(
self,
Expand All @@ -97,9 +110,13 @@ def __init__(

Args:
path: A path or sequence of paths to parquet format files. If multiple
paths are provided, they are interleaved with at most 16 files read in
parallel.
**read_kwargs: Keyword arguments to pass to pyarrow.parquet.ParquetFile.
paths are provided, they are lazily interleaved with at most 16 active
file iterators at a time.
**read_kwargs: Keyword arguments to pass to
:class:`pyarrow.parquet.ParquetFile`. Refer to the PyArrow `ParquetFile
API
<https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html>`__
for the supported options.
"""
super().__init__()
if isinstance(path, (str, bytes)):
Expand Down
Loading