Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,66 @@ on:
default: 'CRITICAL,HIGH'

jobs:
unit-tests:
python-unit-tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10","3.11","3.12","3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[dev]

- name: Run unit tests

- name: Run pure unit tests
run: |
pytest tests/ -m "not spark"

spark-compatibility-tests:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- python-version: "3.10"
pyspark-version: "3.4.0"
- python-version: "3.10"
pyspark-version: "3.5.0"
- python-version: "3.11"
pyspark-version: "3.5.0"
- python-version: "3.12"
pyspark-version: "4.0.0"
- python-version: "3.13"
pyspark-version: "4.0.0"

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[dev]
pip install "pyspark==${{ matrix.pyspark-version }}"

- name: Run Spark tests
run: |
python -m unittest tests/*.py
pytest tests/ -m spark

build-and-pubish:
name: Build and publish Python packages to PyPi
needs: unit-tests
needs: [python-unit-tests, spark-compatibility-tests]
uses: ./.github/workflows/publish-to-pypi-versioned.yaml
with:
release_type: ${{ inputs.release_type }}
Expand Down
44 changes: 40 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ jobs:
- name: Mypy
run: mypy ttd_databricks_python

unit-tests:
name: Unit tests (Python ${{ matrix.python-version }})
python-unit-tests:
name: Python unit tests (${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand All @@ -56,5 +56,41 @@ jobs:
python -m pip install --upgrade pip
pip install -e .[dev]

- name: Run unit tests
run: pytest tests/
- name: Run pure unit tests
run: pytest tests/ -m "not spark"

spark-compatibility-tests:
name: Spark compatibility tests (Python ${{ matrix.python-version }}, PySpark ${{ matrix.pyspark-version }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- python-version: "3.10"
pyspark-version: "3.4.0"
- python-version: "3.10"
pyspark-version: "3.5.0"
- python-version: "3.11"
pyspark-version: "3.5.0"
- python-version: "3.12"
pyspark-version: "4.0.0"
- python-version: "3.13"
pyspark-version: "4.0.0"

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[dev]
pip install "pyspark==${{ matrix.pyspark-version }}"

- name: Run Spark tests
run: pytest tests/ -m spark
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,10 @@ The schema for `TTDEndpoint.DELETION_OPTOUT_MERCHANT` is defined in [deletion_op
Retrieve the full input schema for an endpoint:

```python
from ttd_databricks_python.ttd_databricks import TTDEndpoint
from ttd_databricks_python.ttd_databricks.schemas import get_ttd_input_schema
from ttd_databricks_python.ttd_databricks import TTDEndpoint, get_ttd_input_schema

schema = get_ttd_input_schema(TTDEndpoint.ADVERTISER)
schema.printTreeString()
print(schema.treeString())
```

Get just the required column names (useful for DataFrame preparation):
Expand Down
89 changes: 76 additions & 13 deletions example_notebook/TTD Connector Data SDK Example Notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
"metadata": {},
"outputs": [],
"source": [
"%pip install ttd-databricks"
"%pip install ttd-databricks\n",
"\n",
"# Recommended to restart kerner to use updated packages\n",
"dbutils.library.restartPython()"
]
},
{
Expand Down Expand Up @@ -109,7 +112,16 @@
"\n",
"Use `get_ttd_input_schema()` to see which columns your DataFrame must contain.\n",
"\n",
"For more information on the meaning of particular fields, see [openTTD](https://open.thetradedesk.com/advertiser/docsApp/GuidesAdvertiser/data/doc/post-data-advertiser-firstparty)."
"For more information on the meaning of particular fields and supported data types supported per endpoint refer to the following table:\n",
"\n",
"| Endpoint | Context | Data API | Documentation |\n",
"|---|---|---|---|\n",
"| Advertiser | `AdvertiserContext` | `POST /data/advertiser` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-advertiser-external) |\n",
"| Third Party | `ThirdPartyContext` | `POST /data/thirdparty` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-thirdparty) |\n",
"| Offline Conversion | `OfflineConversionContext` | `POST /providerapi/offlineconversion` | [OpenTTD](https://open.thetradedesk.com/advertiser/docsApp/GuidesAdvertiser/data/doc/post-providerapi-offlineconversion) |\n",
"| Deletion / Opt-Out — Advertiser | `DeletionOptOutAdvertiserContext` | `POST /data/deletion-optout/advertiser` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-deletion-optout-advertiser-external) |\n",
"| Deletion / Opt-Out — Third Party | `DeletionOptOutThirdPartyContext` | `POST /data/deletion-optout/thirdparty` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-deletion-optout-thirdparty) |\n",
"| Deletion / Opt-Out — Merchant | `DeletionOptOutMerchantContext` | `POST /data/deletion-optout/merchant` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/retail/doc/post-data-deletion-optout-merchant) |"
]
},
{
Expand All @@ -120,7 +132,9 @@
"source": [
"input_schema = get_ttd_input_schema(TTDEndpoint.ADVERTISER)\n",
"print(\"Required input schema:\")\n",
"input_schema.printTreeString()"
"\n",
"for field in input_schema.fields:\n",
" print(f\" {field.name}: {field.dataType.simpleString()} (nullable={field.nullable})\")"
]
},
{
Expand All @@ -142,11 +156,15 @@
"# Example: create a small sample DataFrame\n",
"# In practice, read from a Delta table or other data source\n",
"sample_data = [\n",
" (\"tdid-001\", \"seg-001\"),\n",
" (\"tdid-002\", \"seg-002\"),\n",
" (\"tdid-003\", \"seg-001\"),\n",
" {\"id_type\": \"tdid\", \"id_value\": \"a3f1c2d4-8e7b-4f6a-9c0d-1b2e3f4a5b6c\", \"segment_name\": \"segment_1\"},\n",
" {\"id_type\": \"daid\", \"id_value\": \"7d9e0f1a-2b3c-4d5e-6f7a-8b9c0d1e2f3a\", \"segment_name\": \"segment_2\"},\n",
" # intentionally incorrect format for ramp_id to showcase error enrties in output\n",
" {\"id_type\": \"ramp_id\", \"id_value\": \"c4d5e6f7-a8b9-4c0d-1e2f-3a4b5c6d7e8f\", \"segment_name\": \"segment_3\"},\n",
" {\"id_type\": \"tdid\", \"id_value\": \"1f2a3b4c-5d6e-4f7a-8b9c-0d1e2f3a4b5c\", \"segment_name\": \"segment_4\"},\n",
" {\"id_type\": \"daid\", \"id_value\": \"9b0c1d2e-3f4a-4b5c-6d7e-8f9a0b1c2d3e\", \"segment_name\": \"segment_5\"},\n",
"]\n",
"input_df = spark.createDataFrame(sample_data, schema=input_schema)\n",
"\n",
"input_df = spark.createDataFrame(sample_data)\n",
"display(input_df)"
]
},
Expand All @@ -173,7 +191,7 @@
" result_df = client.push_data(\n",
" df=input_df,\n",
" context=context,\n",
" batch_size=1600,\n",
" batch_size=1600, # Number of rows batched together in a single request to The Trade Desk\n",
" )\n",
" display(result_df)\n",
"except TTDSchemaValidationError as e:\n",
Expand Down Expand Up @@ -226,33 +244,78 @@
"metadata": {},
"outputs": [],
"source": [
"# Create tables with default names and managed storage\n",
"# Creates three managed Delta tables in the active catalog/database.\n",
"# Default names: ttd_advertiser_input, ttd_advertiser_output, ttd_metadata\n",
"# Pass table_name= and location= to use custom names or external storage.\n",
"input_table = client.setup_input_table(endpoint=TTDEndpoint.ADVERTISER)\n",
"output_table = client.setup_output_table(endpoint=TTDEndpoint.ADVERTISER)\n",
"metadata_table = client.setup_metadata_table()\n",
"metadata_table = client.setup_metadata_table(table_name=\"ttd_advertiser_metadata\")\n",
"\n",
"print(f\"Input table: {input_table}\")\n",
"print(f\"Output table: {output_table}\")\n",
"print(f\"Metadata table: {metadata_table}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import functions as F\n",
"\n",
"# This cell simulates your upstream pipeline writing records to the input table\n",
"# The user is responsible to write into the input table, the SDK only performs reads from the table\n",
"\n",
"\n",
"# updated_at is required for incremental processing: batch_process uses it\n",
"# to filter rows added since the last run when process_new_records_only=True\n",
"# The user is responsible to set the updated_at value for entries in the input table\n",
"\n",
"(\n",
" spark.createDataFrame(sample_data)\n",
" .withColumn(\"updated_at\", F.current_timestamp())\n",
" .write.format(\"delta\")\n",
" .mode(\"append\")\n",
" .saveAsTable(input_table)\n",
")\n",
"\n",
"display(spark.table(input_table))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Run batch processing (reads from input_table, writes to output_table)\n",
"\n",
"# process_new_records_only=True filters to rows where updated_at > last run date\n",
"# On the first run, metadata_table is empty so all rows are processed\n",
"client.batch_process(\n",
" context=context,\n",
" input_table=input_table,\n",
" output_table=output_table,\n",
" metadata_table=metadata_table,\n",
" process_new_records_only=True, # incremental: only rows newer than last run\n",
" batch_size=1600,\n",
" parallelism=16,\n",
" process_new_records_only=True, # Processes rows updated after last run; processes all rows on first run\n",
" batch_size=1600, # Number of rows grouped together in a single request to The Trade Desk\n",
" parallelism=16, # Number of paralellel workers processing the entries from the input table\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Display the output table\n",
"display(spark.table(output_table))\n",
"\n",
"# Display the metadata table\n",
"display(spark.table(metadata_table))"
]
}
],
"metadata": {
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ ignore = [
[tool.ruff.lint.isort]
known-first-party = ["ttd_databricks_python"]

[tool.pytest.ini_options]
markers = [
"spark: marks tests that require a running SparkSession (deselect with '-m \"not spark\"')",
]

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_batch_process_early_exit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from ttd_databricks_python.ttd_databricks.exceptions import TTDConfigurationError
from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient

pytestmark = pytest.mark.spark

_CONTEXT = AdvertiserContext(advertiser_id="adv123")
_REQUIRED_SCHEMA = StructType(
[
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_client_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from ttd_databricks_python.ttd_databricks.schemas import get_metadata_schema
from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient

pytestmark = pytest.mark.spark


def _make_client(**kwargs) -> TtdDatabricksClient: # type: ignore[no-untyped-def]
return TtdDatabricksClient(
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_push_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from ttd_databricks_python.ttd_databricks.exceptions import TTDSchemaValidationError
from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient

pytestmark = pytest.mark.spark

_CONTEXT = AdvertiserContext(advertiser_id="adv123")
_REQUIRED_SCHEMA = StructType(
[
Expand Down
Loading