Skip to content

Commit d6c7fea

Browse files
Merge pull request #38 from thetradedesk/upgrade-python-version-to-3.12-and-update-README-and-example-notebook
Ensure tests run across combinations of python and spark versions and update readme and example notebook
2 parents 03e052a + a0f0437 commit d6c7fea

8 files changed

Lines changed: 172 additions & 29 deletions

File tree

.github/workflows/build-and-publish.yaml

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,66 @@ on:
1919
default: 'CRITICAL,HIGH'
2020

2121
jobs:
22-
unit-tests:
22+
python-unit-tests:
2323
runs-on: ubuntu-latest
2424
strategy:
2525
matrix:
26-
python-version: ["3.10","3.11","3.12","3.13"]
27-
26+
python-version: ["3.10", "3.11", "3.12", "3.13"]
27+
2828
steps:
2929
- uses: actions/checkout@v4
30-
30+
3131
- name: Set up Python ${{ matrix.python-version }}
3232
uses: actions/setup-python@v4
3333
with:
3434
python-version: ${{ matrix.python-version }}
35-
35+
3636
- name: Install dependencies
3737
run: |
3838
python -m pip install --upgrade pip
3939
pip install -e .[dev]
40-
41-
- name: Run unit tests
40+
41+
- name: Run pure unit tests
42+
run: |
43+
pytest tests/ -m "not spark"
44+
45+
spark-compatibility-tests:
46+
runs-on: ubuntu-latest
47+
strategy:
48+
matrix:
49+
include:
50+
- python-version: "3.10"
51+
pyspark-version: "3.4.0"
52+
- python-version: "3.10"
53+
pyspark-version: "3.5.0"
54+
- python-version: "3.11"
55+
pyspark-version: "3.5.0"
56+
- python-version: "3.12"
57+
pyspark-version: "4.0.0"
58+
- python-version: "3.13"
59+
pyspark-version: "4.0.0"
60+
61+
steps:
62+
- uses: actions/checkout@v4
63+
64+
- name: Set up Python ${{ matrix.python-version }}
65+
uses: actions/setup-python@v4
66+
with:
67+
python-version: ${{ matrix.python-version }}
68+
69+
- name: Install dependencies
70+
run: |
71+
python -m pip install --upgrade pip
72+
pip install -e .[dev]
73+
pip install "pyspark==${{ matrix.pyspark-version }}"
74+
75+
- name: Run Spark tests
4276
run: |
43-
python -m unittest tests/*.py
77+
pytest tests/ -m spark
4478
4579
build-and-pubish:
4680
name: Build and publish Python packages to PyPi
47-
needs: unit-tests
81+
needs: [python-unit-tests, spark-compatibility-tests]
4882
uses: ./.github/workflows/publish-to-pypi-versioned.yaml
4983
with:
5084
release_type: ${{ inputs.release_type }}

.github/workflows/ci.yaml

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ jobs:
3434
- name: Mypy
3535
run: mypy ttd_databricks_python
3636

37-
unit-tests:
38-
name: Unit tests (Python ${{ matrix.python-version }})
37+
python-unit-tests:
38+
name: Python unit tests (${{ matrix.python-version }})
3939
runs-on: ubuntu-latest
4040
strategy:
4141
fail-fast: false
@@ -56,5 +56,41 @@ jobs:
5656
python -m pip install --upgrade pip
5757
pip install -e .[dev]
5858
59-
- name: Run unit tests
60-
run: pytest tests/
59+
- name: Run pure unit tests
60+
run: pytest tests/ -m "not spark"
61+
62+
spark-compatibility-tests:
63+
name: Spark compatibility tests (Python ${{ matrix.python-version }}, PySpark ${{ matrix.pyspark-version }})
64+
runs-on: ubuntu-latest
65+
strategy:
66+
fail-fast: false
67+
matrix:
68+
include:
69+
- python-version: "3.10"
70+
pyspark-version: "3.4.0"
71+
- python-version: "3.10"
72+
pyspark-version: "3.5.0"
73+
- python-version: "3.11"
74+
pyspark-version: "3.5.0"
75+
- python-version: "3.12"
76+
pyspark-version: "4.0.0"
77+
- python-version: "3.13"
78+
pyspark-version: "4.0.0"
79+
80+
steps:
81+
- uses: actions/checkout@v4
82+
83+
- name: Set up Python ${{ matrix.python-version }}
84+
uses: actions/setup-python@v4
85+
with:
86+
python-version: ${{ matrix.python-version }}
87+
cache: "pip"
88+
89+
- name: Install dependencies
90+
run: |
91+
python -m pip install --upgrade pip
92+
pip install -e .[dev]
93+
pip install "pyspark==${{ matrix.pyspark-version }}"
94+
95+
- name: Run Spark tests
96+
run: pytest tests/ -m spark

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,11 +326,10 @@ The schema for `TTDEndpoint.DELETION_OPTOUT_MERCHANT` is defined in [deletion_op
326326
Retrieve the full input schema for an endpoint:
327327

328328
```python
329-
from ttd_databricks_python.ttd_databricks import TTDEndpoint
330-
from ttd_databricks_python.ttd_databricks.schemas import get_ttd_input_schema
329+
from ttd_databricks_python.ttd_databricks import TTDEndpoint, get_ttd_input_schema
331330

332331
schema = get_ttd_input_schema(TTDEndpoint.ADVERTISER)
333-
schema.printTreeString()
332+
print(schema.treeString())
334333
```
335334

336335
Get just the required column names (useful for DataFrame preparation):

example_notebook/TTD Connector Data SDK Example Notebook.ipynb

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
"metadata": {},
2323
"outputs": [],
2424
"source": [
25-
"%pip install ttd-databricks"
25+
"%pip install ttd-databricks\n",
26+
"\n",
27+
"# Recommended to restart kerner to use updated packages\n",
28+
"dbutils.library.restartPython()"
2629
]
2730
},
2831
{
@@ -109,7 +112,16 @@
109112
"\n",
110113
"Use `get_ttd_input_schema()` to see which columns your DataFrame must contain.\n",
111114
"\n",
112-
"For more information on the meaning of particular fields, see [openTTD](https://open.thetradedesk.com/advertiser/docsApp/GuidesAdvertiser/data/doc/post-data-advertiser-firstparty)."
115+
"For more information on the meaning of particular fields and supported data types supported per endpoint refer to the following table:\n",
116+
"\n",
117+
"| Endpoint | Context | Data API | Documentation |\n",
118+
"|---|---|---|---|\n",
119+
"| Advertiser | `AdvertiserContext` | `POST /data/advertiser` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-advertiser-external) |\n",
120+
"| Third Party | `ThirdPartyContext` | `POST /data/thirdparty` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-thirdparty) |\n",
121+
"| Offline Conversion | `OfflineConversionContext` | `POST /providerapi/offlineconversion` | [OpenTTD](https://open.thetradedesk.com/advertiser/docsApp/GuidesAdvertiser/data/doc/post-providerapi-offlineconversion) |\n",
122+
"| Deletion / Opt-Out — Advertiser | `DeletionOptOutAdvertiserContext` | `POST /data/deletion-optout/advertiser` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-deletion-optout-advertiser-external) |\n",
123+
"| Deletion / Opt-Out — Third Party | `DeletionOptOutThirdPartyContext` | `POST /data/deletion-optout/thirdparty` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-deletion-optout-thirdparty) |\n",
124+
"| Deletion / Opt-Out — Merchant | `DeletionOptOutMerchantContext` | `POST /data/deletion-optout/merchant` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/retail/doc/post-data-deletion-optout-merchant) |"
113125
]
114126
},
115127
{
@@ -120,7 +132,9 @@
120132
"source": [
121133
"input_schema = get_ttd_input_schema(TTDEndpoint.ADVERTISER)\n",
122134
"print(\"Required input schema:\")\n",
123-
"input_schema.printTreeString()"
135+
"\n",
136+
"for field in input_schema.fields:\n",
137+
" print(f\" {field.name}: {field.dataType.simpleString()} (nullable={field.nullable})\")"
124138
]
125139
},
126140
{
@@ -142,11 +156,15 @@
142156
"# Example: create a small sample DataFrame\n",
143157
"# In practice, read from a Delta table or other data source\n",
144158
"sample_data = [\n",
145-
" (\"tdid-001\", \"seg-001\"),\n",
146-
" (\"tdid-002\", \"seg-002\"),\n",
147-
" (\"tdid-003\", \"seg-001\"),\n",
159+
" {\"id_type\": \"tdid\", \"id_value\": \"a3f1c2d4-8e7b-4f6a-9c0d-1b2e3f4a5b6c\", \"segment_name\": \"segment_1\"},\n",
160+
" {\"id_type\": \"daid\", \"id_value\": \"7d9e0f1a-2b3c-4d5e-6f7a-8b9c0d1e2f3a\", \"segment_name\": \"segment_2\"},\n",
161+
" # intentionally incorrect format for ramp_id to showcase error enrties in output\n",
162+
" {\"id_type\": \"ramp_id\", \"id_value\": \"c4d5e6f7-a8b9-4c0d-1e2f-3a4b5c6d7e8f\", \"segment_name\": \"segment_3\"},\n",
163+
" {\"id_type\": \"tdid\", \"id_value\": \"1f2a3b4c-5d6e-4f7a-8b9c-0d1e2f3a4b5c\", \"segment_name\": \"segment_4\"},\n",
164+
" {\"id_type\": \"daid\", \"id_value\": \"9b0c1d2e-3f4a-4b5c-6d7e-8f9a0b1c2d3e\", \"segment_name\": \"segment_5\"},\n",
148165
"]\n",
149-
"input_df = spark.createDataFrame(sample_data, schema=input_schema)\n",
166+
"\n",
167+
"input_df = spark.createDataFrame(sample_data)\n",
150168
"display(input_df)"
151169
]
152170
},
@@ -173,7 +191,7 @@
173191
" result_df = client.push_data(\n",
174192
" df=input_df,\n",
175193
" context=context,\n",
176-
" batch_size=1600,\n",
194+
" batch_size=1600, # Number of rows batched together in a single request to The Trade Desk\n",
177195
" )\n",
178196
" display(result_df)\n",
179197
"except TTDSchemaValidationError as e:\n",
@@ -226,33 +244,78 @@
226244
"metadata": {},
227245
"outputs": [],
228246
"source": [
229-
"# Create tables with default names and managed storage\n",
247+
"# Creates three managed Delta tables in the active catalog/database.\n",
248+
"# Default names: ttd_advertiser_input, ttd_advertiser_output, ttd_metadata\n",
249+
"# Pass table_name= and location= to use custom names or external storage.\n",
230250
"input_table = client.setup_input_table(endpoint=TTDEndpoint.ADVERTISER)\n",
231251
"output_table = client.setup_output_table(endpoint=TTDEndpoint.ADVERTISER)\n",
232-
"metadata_table = client.setup_metadata_table()\n",
252+
"metadata_table = client.setup_metadata_table(table_name=\"ttd_advertiser_metadata\")\n",
233253
"\n",
234254
"print(f\"Input table: {input_table}\")\n",
235255
"print(f\"Output table: {output_table}\")\n",
236256
"print(f\"Metadata table: {metadata_table}\")"
237257
]
238258
},
259+
{
260+
"cell_type": "code",
261+
"execution_count": null,
262+
"metadata": {},
263+
"outputs": [],
264+
"source": [
265+
"from pyspark.sql import functions as F\n",
266+
"\n",
267+
"# This cell simulates your upstream pipeline writing records to the input table\n",
268+
"# The user is responsible to write into the input table, the SDK only performs reads from the table\n",
269+
"\n",
270+
"\n",
271+
"# updated_at is required for incremental processing: batch_process uses it\n",
272+
"# to filter rows added since the last run when process_new_records_only=True\n",
273+
"# The user is responsible to set the updated_at value for entries in the input table\n",
274+
"\n",
275+
"(\n",
276+
" spark.createDataFrame(sample_data)\n",
277+
" .withColumn(\"updated_at\", F.current_timestamp())\n",
278+
" .write.format(\"delta\")\n",
279+
" .mode(\"append\")\n",
280+
" .saveAsTable(input_table)\n",
281+
")\n",
282+
"\n",
283+
"display(spark.table(input_table))"
284+
]
285+
},
239286
{
240287
"cell_type": "code",
241288
"execution_count": null,
242289
"metadata": {},
243290
"outputs": [],
244291
"source": [
245292
"# Run batch processing (reads from input_table, writes to output_table)\n",
293+
"\n",
294+
"# process_new_records_only=True filters to rows where updated_at > last run date\n",
295+
"# On the first run, metadata_table is empty so all rows are processed\n",
246296
"client.batch_process(\n",
247297
" context=context,\n",
248298
" input_table=input_table,\n",
249299
" output_table=output_table,\n",
250300
" metadata_table=metadata_table,\n",
251-
" process_new_records_only=True, # incremental: only rows newer than last run\n",
252-
" batch_size=1600,\n",
253-
" parallelism=16,\n",
301+
" process_new_records_only=True, # Processes rows updated after last run; processes all rows on first run\n",
302+
" batch_size=1600, # Number of rows grouped together in a single request to The Trade Desk\n",
303+
" parallelism=16, # Number of paralellel workers processing the entries from the input table\n",
254304
")"
255305
]
306+
},
307+
{
308+
"cell_type": "code",
309+
"execution_count": null,
310+
"metadata": {},
311+
"outputs": [],
312+
"source": [
313+
"# Display the output table\n",
314+
"display(spark.table(output_table))\n",
315+
"\n",
316+
"# Display the metadata table\n",
317+
"display(spark.table(metadata_table))"
318+
]
256319
}
257320
],
258321
"metadata": {

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ ignore = [
5858
[tool.ruff.lint.isort]
5959
known-first-party = ["ttd_databricks_python"]
6060

61+
[tool.pytest.ini_options]
62+
markers = [
63+
"spark: marks tests that require a running SparkSession (deselect with '-m \"not spark\"')",
64+
]
65+
6166
[tool.ruff.format]
6267
quote-style = "double"
6368
indent-style = "space"

tests/unit/test_batch_process_early_exit.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
from ttd_databricks_python.ttd_databricks.exceptions import TTDConfigurationError
2020
from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient
2121

22+
pytestmark = pytest.mark.spark
23+
2224
_CONTEXT = AdvertiserContext(advertiser_id="adv123")
2325
_REQUIRED_SCHEMA = StructType(
2426
[

tests/unit/test_client_helpers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from ttd_databricks_python.ttd_databricks.schemas import get_metadata_schema
1414
from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient
1515

16+
pytestmark = pytest.mark.spark
17+
1618

1719
def _make_client(**kwargs) -> TtdDatabricksClient: # type: ignore[no-untyped-def]
1820
return TtdDatabricksClient(

tests/unit/test_push_data.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from ttd_databricks_python.ttd_databricks.exceptions import TTDSchemaValidationError
1717
from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient
1818

19+
pytestmark = pytest.mark.spark
20+
1921
_CONTEXT = AdvertiserContext(advertiser_id="adv123")
2022
_REQUIRED_SCHEMA = StructType(
2123
[

0 commit comments

Comments
 (0)