Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from timdex_dataset_api import TIMDEXDataset

# timdex-dataset-api
Python library for interacting with a TIMDEX parquet dataset located remotely or in S3. This library is often abbreviated as "TDA".

Expand All @@ -11,9 +9,10 @@ Python library for interacting with a TIMDEX parquet dataset located remotely or
- To run unit tests: `make test`
- To lint the repo: `make lint`

The library version number is set in [`timdex_dataset_api/__init__.py`](timdex_dataset_api/__init__.py), e.g.:
```python
__version__ = "2.1.0"
The library version number is set in [`pyproject.toml`](pyproject.toml), e.g.:
```toml
[project]
version = "5.0.0"
```

Updating the version number when making changes to the library will prompt applications that install it, when they have _their_ dependencies updated, to pickup the new version.
Expand Down Expand Up @@ -74,15 +73,6 @@ With the env var `MINIO_S3_ENDPOINT_URL` set, this library will configure `pyarr

## Usage

Currently, the most common use cases are:
* **Transmogrifier**: uses TDA to **write** to the parquet dataset
* **TIMDEX-Index-Manager (TIM)**: uses TDA to **read** from the parquet dataset

Beyond those two ETL run use cases, others are emerging where this library proves helpful:

* yielding only the current version of all records in the dataset, useful for quickly re-indexing to Opensearch
* high throughput (time) + memory safe (space) access to the dataset for analysis

For both reading and writing, the following env vars are recommended:
```shell
TDA_LOG_LEVEL=INFO
Expand All @@ -105,15 +95,19 @@ timdex_dataset = TIMDEXDataset("s3://my-bucket/path/to/dataset")
timdex_dataset = TIMDEXDataset("/path/to/dataset")
```

All read methods for `TIMDEXDataset` allow for the same group of filters which are defined in `timdex_dataset_api.dataset.DatasetFilters`. Examples are shown below.
Source-specific operations are available on composed objects such as
`timdex_dataset.records` and `timdex_dataset.embeddings`.

All read methods for `timdex_dataset.records` allow for the same group of filters.
Examples are shown below.

```python
# read a single row, no filtering
single_record_dict = next(timdex_dataset.read_dicts_iter())
# read a single record row, no filtering
single_record_dict = next(timdex_dataset.records.read_dicts_iter())


# get batches of records, filtering to a particular run
for batch in timdex_dataset.read_batches_iter(
for batch in timdex_dataset.records.read_batches_iter(
source="alma",
run_date="2025-06-01",
run_id="abc123"
Expand All @@ -123,7 +117,7 @@ for batch in timdex_dataset.read_batches_iter(

# use convenience method to yield only transformed records
# NOTE: this is what TIM uses for indexing to Opensearch for a given ETL run
for transformed_record in timdex_dataset.read_transformed_records_iter(
for transformed_record in timdex_dataset.records.read_transformed_records_iter(
source="aspace",
run_date="2025-06-01",
run_id="ghi789"
Expand All @@ -133,7 +127,7 @@ for transformed_record in timdex_dataset.read_transformed_records_iter(

# load all records for a given run into a pandas dataframe
# NOTE: this can be potentially expensive memory-wise if the run is large
run_df = timdex_dataset.read_dataframe(
run_df = timdex_dataset.records.read_dataframe(
source="dspace",
run_date="2025-06-01",
run_id="def456"
Expand All @@ -146,7 +140,9 @@ See [docs/reading.md](docs/reading.md) for more information.

At this time, the only application that writes to the ETL parquet dataset is Transmogrifier.

To write records to the dataset, you must prepare an iterator of `timdex_dataset_api.record.DatasetRecord`. Here is some pseudocode for how a dataset write can work:
To write records to the dataset, you must prepare an iterator of
`timdex_dataset_api.records.DatasetRecord`. Here is some pseudocode for how a
record dataset write can work:

```python
from timdex_dataset_api import DatasetRecord, TIMDEXDataset
Expand All @@ -171,5 +167,5 @@ records_iter = records_to_write_iter()

# finally, perform the write, relying on the library to handle efficient batching
timdex_dataset = TIMDEXDataset("/path/to/dataset")
timdex_dataset.write(records_iter=records_iter)
timdex_dataset.records.write(records_iter)
```
78 changes: 49 additions & 29 deletions docs/reading.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
# Reading data from TIMDEXDataset

This guide explains how `TIMDEXDataset` read methods work and how to use them effectively.
This guide explains how TIMDEXDataset data source read methods work and how to use them effectively.

- `TIMDEXDataset` and `TIMDEXDatasetMetadata` both maintain an in-memory DuckDB context. You can issue DuckDB SQL against the views/tables they create.
- `TIMDEXDataset` maintains an in-memory DuckDB context. You can issue DuckDB SQL against the views/tables they create.
- Source-specific read methods are exposed on `timdex_dataset.records` and `timdex_dataset.embeddings`.
- Read methods use a two-step query flow for performance:
1) a metadata query determines which Parquet files and row offsets are relevant
2) a data query reads just those rows and returns the requested columns
- Prefer simple key/value `DatasetFilters` for most use cases; add a `where=` SQL predicate when you need more advanced logic (e.g., ranges, `BETWEEN`, `>`, `<`, `IN`).
- Prefer simple key/value filters for most use cases; add a `where=` SQL predicate when you need more advanced logic (e.g., ranges, `BETWEEN`, `>`, `<`, `IN`).

## Available read methods

The shared read methods below are available on both `timdex_dataset.records` and
`timdex_dataset.embeddings`:

- `read_batches_iter(...)`: yields `pyarrow.RecordBatch`
- `read_dicts_iter(...)`: yields Python `dict` per row
- `read_dataframe(...)`: returns a pandas `DataFrame`
- `read_dataframes_iter(...)`: yields pandas `DataFrame` batches

Additionally, `timdex_dataset.records` provides:

- `read_transformed_records_iter(...)`: yields `transformed_record` dictionaries only

All accept the same `DatasetFilters` and the optional `where=` SQL predicate.
All accept the same key/value filters and the optional `where=` SQL predicate.

## Filters vs. where=

- `DatasetFilters` are key/value arguments on read methods. They are validated and translated into SQL and will cover most queries.
- Key/value filters are keyword arguments on read methods. They are validated and translated into SQL and will cover most queries.
- Examples: `source="alma"`, `run_date="2024-12-01"`, `run_type="daily"`, `action="index"`
- `where=` is an optional raw SQL WHERE predicate string, combined with `DatasetFilters` using `AND`. Use it for:
- `where=` is an optional raw SQL WHERE predicate string, combined with these filters using `AND`. Use it for:
- date/time ranges (BETWEEN, >, <)
- set membership (IN (...))
- complex boolean logic (AND/OR grouping)
Expand All @@ -46,7 +53,7 @@ This pattern keeps reads fast and memory-efficient even for large datasets.
The following diagram shows the flow for an example query:

```python
for record_dict in td.read_dicts_iter(
for record_dict in td.records.read_dicts_iter(
table="records",
source="dspace",
run_date="2025-09-01",
Expand All @@ -65,7 +72,7 @@ sequenceDiagram
participant P as Parquet files

U->>TD: Perform query
Note left of TD: read_dicts_iter(<br>table="records",<br>source="dspace",<br>run_date="2025-09-01",<br>run_id="abc123")
Note left of TD: records.read_dicts_iter(<br>table="records",<br>source="dspace",<br>run_date="2025-09-01",<br>run_id="abc123")
TD->>TDM: build_meta_query(table, filters, where=None)
Note right of TDM: (Metadata Query)<br><br>SELECT r.timdex_record_id, r.run_id, r.filename, r.run_record_offset<br>FROM metadata.records r<br>WHERE r.source = 'dspace'<br>AND r.run_date = '2025-09-01'<br>AND r.run_id = 'abc123'<br>ORDER BY r.filename, r.run_record_offset

Expand All @@ -88,75 +95,88 @@ from timdex_dataset_api import TIMDEXDataset
td = TIMDEXDataset("s3://my-bucket/timdex-dataset") # example instance

# 1) Get a single record as a dict
first = next(td.read_dicts_iter())
first = next(td.records.read_dicts_iter())

# 2) Read batches with simple filters
for batch in td.read_batches_iter(source="alma", run_date="2025-06-01", run_id="abc123"):
for batch in td.records.read_batches_iter(
source="alma",
run_date="2025-06-01",
run_id="abc123",
):
... # process pyarrow.RecordBatch

# 3) DataFrame of one run
df = td.read_dataframe(source="dspace", run_date="2025-06-01", run_id="def456")
df = td.records.read_dataframe(
source="dspace",
run_date="2025-06-01",
run_id="def456",
)

# 4) Only transformed records (used by indexer)
for rec in td.read_transformed_records_iter(source="aspace", run_type="daily"):
for rec in td.records.read_transformed_records_iter(
source="aspace",
run_type="daily",
):
... # rec is a dict of the transformed_record
```

## `where=` examples

Advanced filtering that complements `DatasetFilters`.
Advanced filtering that complements key/value filters.

```python
# date range with BETWEEN
where = "run_date BETWEEN '2024-12-01' AND '2024-12-31'"
df = td.read_dataframe(source="alma", where=where)
df = td.records.read_dataframe(source="alma", where=where)

# greater-than on a timestamp (if present in columns)
where = "run_timestamp > '2024-12-01T10:00:00Z'"
df = td.read_dataframe(source="aspace", run_type="daily", where=where)
df = td.records.read_dataframe(source="aspace", run_type="daily", where=where)

# combine set membership and action
where = "run_id IN ('run-1', 'run-3', 'run-5') AND action = 'index'"
df = td.read_dataframe(source="alma", where=where)
df = td.records.read_dataframe(source="alma", where=where)

# combine filters (AND) with where=
where = "run_type = 'daily' AND action = 'index'"
df = td.read_dataframe(source="libguides", where=where)
df = td.records.read_dataframe(source="libguides", where=where)
```

Validation tips:
- Use only a predicate (no SELECT/FROM, no trailing semicolon).
- Column names must exist in the target table/view (e.g., records or current_records).
- `DatasetFilters` + `where=` are ANDed; if the combination yields zero rows, you’ll get an empty result.
- Key/value filters + `where=` are ANDed; if the combination yields zero rows, you’ll get an empty result.

## Choosing a table

By default, read methods query the `records` view (all versions). To get only the latest version per `timdex_record_id`, target the `current_records` view:
For `timdex_dataset.records`, read methods query the `records` table by default (all versions). To get only the latest version per `timdex_record_id`, target the `current_records` view:

```python
# ALL records in the 'libguides' source
all_libguides_df = td.read_dataframe(table="records", source="libguides")
all_libguides_df = td.records.read_dataframe(table="records", source="libguides")

# latest unique records across the dataset
current_df = td.read_dataframe(table="current_records")
current_df = td.records.read_dataframe(table="current_records")

# current records for a source and specific run
current_df = td.read_dataframe(table="current_records", source="alma", run_id="run-5")
current_df = td.records.read_dataframe(
table="current_records",
source="alma",
run_id="run-5",
)
```

## DuckDB context

- `TIMDEXDataset` exposes a DuckDB connection used for data queries against Parquet.
- `TIMDEXDatasetMetadata` exposes a DuckDB connection used for metadata queries and provides views:
- `metadata.records`: all record versions with run metadata
- `metadata.current_records`: latest record per `timdex_record_id`
- `metadata.append_deltas`: incremental write tracking
- `TIMDEXDataset` exposes a DuckDB connection used for metadata and data queries against Parquet.
- `TIMDEXDataSource` provides a base class that data sources extend
- each data source class defines "tables" that are available for that source in the `metadata` schema

You can execute raw DuckDB SQL for inspection and debugging:

```python
# access metadata connection
conn = td.metadata.conn # DuckDB connection
# access dataset DuckDB connection
conn = td.conn # DuckDB connection

# peek at view schemas
print(conn.sql("DESCRIBE metadata.records").to_df())
Expand Down
5 changes: 3 additions & 2 deletions migrations/001_2025_05_30_backfill_run_timestamp_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from pyarrow import fs

from timdex_dataset_api.config import configure_dev_logger, configure_logger
from timdex_dataset_api.dataset import TIMDEX_DATASET_SCHEMA, TIMDEXDataset
from timdex_dataset_api.dataset import TIMDEXDataset
from timdex_dataset_api.records import TIMDEXRecords

configure_dev_logger()

Expand Down Expand Up @@ -125,7 +126,7 @@ def backfill_parquet_file(

# Create run_timestamp column using the exact schema definition
num_rows = len(table)
run_timestamp_field = TIMDEX_DATASET_SCHEMA.field("run_timestamp")
run_timestamp_field = TIMDEXRecords.SCHEMA.field("run_timestamp")
run_timestamp_array = pa.array(
[creation_date] * num_rows, type=run_timestamp_field.type
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@

from timdex_dataset_api import TIMDEXDatasetMetadata
from timdex_dataset_api.config import configure_dev_logger, configure_logger
from timdex_dataset_api.dataset import TIMDEX_DATASET_SCHEMA, TIMDEXDataset
from timdex_dataset_api.dataset import TIMDEXDataset
from timdex_dataset_api.records import TIMDEXRecords

configure_dev_logger()

Expand Down Expand Up @@ -174,7 +175,7 @@ def backfill_parquet_file(

# set new run_timestamp value
num_rows = len(table)
run_timestamp_field = TIMDEX_DATASET_SCHEMA.field("run_timestamp")
run_timestamp_field = TIMDEXRecords.SCHEMA.field("run_timestamp")
new_run_timestamp_array = pa.array(
[new_run_timestamp] * num_rows, type=run_timestamp_field.type
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "timdex_dataset_api"
version = "4.1.0"
version = "5.0.0"
description = "Python library for interacting with a TIMDEX parquet dataset"
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading
Loading