diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index 1e05ae5..ae576ca 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -19,32 +19,66 @@ on: default: 'CRITICAL,HIGH' jobs: - unit-tests: + python-unit-tests: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.10","3.11","3.12","3.13"] - + python-version: ["3.10", "3.11", "3.12", "3.13"] + steps: - uses: actions/checkout@v4 - + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - + - name: Install dependencies run: | python -m pip install --upgrade pip pip install -e .[dev] - - - name: Run unit tests + + - name: Run pure unit tests + run: | + pytest tests/ -m "not spark" + + spark-compatibility-tests: + runs-on: ubuntu-latest + strategy: + matrix: + include: + - python-version: "3.10" + pyspark-version: "3.4.0" + - python-version: "3.10" + pyspark-version: "3.5.0" + - python-version: "3.11" + pyspark-version: "3.5.0" + - python-version: "3.12" + pyspark-version: "4.0.0" + - python-version: "3.13" + pyspark-version: "4.0.0" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e .[dev] + pip install "pyspark==${{ matrix.pyspark-version }}" + + - name: Run Spark tests run: | - python -m unittest tests/*.py + pytest tests/ -m spark build-and-pubish: name: Build and publish Python packages to PyPi - needs: unit-tests + needs: [python-unit-tests, spark-compatibility-tests] uses: ./.github/workflows/publish-to-pypi-versioned.yaml with: release_type: ${{ inputs.release_type }} diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4be5b8a..6de6def 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -34,8 +34,8 @@ jobs: - name: Mypy run: mypy ttd_databricks_python - unit-tests: - name: Unit tests (Python ${{ matrix.python-version }}) + python-unit-tests: + name: Python unit tests (${{ matrix.python-version }}) runs-on: ubuntu-latest strategy: fail-fast: false @@ -56,5 +56,41 @@ jobs: python -m pip install --upgrade pip pip install -e .[dev] - - name: Run unit tests - run: pytest tests/ + - name: Run pure unit tests + run: pytest tests/ -m "not spark" + + spark-compatibility-tests: + name: Spark compatibility tests (Python ${{ matrix.python-version }}, PySpark ${{ matrix.pyspark-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - python-version: "3.10" + pyspark-version: "3.4.0" + - python-version: "3.10" + pyspark-version: "3.5.0" + - python-version: "3.11" + pyspark-version: "3.5.0" + - python-version: "3.12" + pyspark-version: "4.0.0" + - python-version: "3.13" + pyspark-version: "4.0.0" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e .[dev] + pip install "pyspark==${{ matrix.pyspark-version }}" + + - name: Run Spark tests + run: pytest tests/ -m spark diff --git a/README.md b/README.md index 9d5be4c..9347a0f 100644 --- a/README.md +++ b/README.md @@ -326,11 +326,10 @@ The schema for `TTDEndpoint.DELETION_OPTOUT_MERCHANT` is defined in [deletion_op Retrieve the full input schema for an endpoint: ```python -from ttd_databricks_python.ttd_databricks import TTDEndpoint -from ttd_databricks_python.ttd_databricks.schemas import get_ttd_input_schema +from ttd_databricks_python.ttd_databricks import TTDEndpoint, get_ttd_input_schema schema = get_ttd_input_schema(TTDEndpoint.ADVERTISER) -schema.printTreeString() +print(schema.treeString()) ``` Get just the required column names (useful for DataFrame preparation): diff --git a/example_notebook/TTD Connector Data SDK Example Notebook.ipynb b/example_notebook/TTD Connector Data SDK Example Notebook.ipynb index 63062a1..4cb3dab 100644 --- a/example_notebook/TTD Connector Data SDK Example Notebook.ipynb +++ b/example_notebook/TTD Connector Data SDK Example Notebook.ipynb @@ -22,7 +22,10 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install ttd-databricks" + "%pip install ttd-databricks\n", + "\n", + "# Recommended to restart kerner to use updated packages\n", + "dbutils.library.restartPython()" ] }, { @@ -109,7 +112,16 @@ "\n", "Use `get_ttd_input_schema()` to see which columns your DataFrame must contain.\n", "\n", - "For more information on the meaning of particular fields, see [openTTD](https://open.thetradedesk.com/advertiser/docsApp/GuidesAdvertiser/data/doc/post-data-advertiser-firstparty)." + "For more information on the meaning of particular fields and supported data types supported per endpoint refer to the following table:\n", + "\n", + "| Endpoint | Context | Data API | Documentation |\n", + "|---|---|---|---|\n", + "| Advertiser | `AdvertiserContext` | `POST /data/advertiser` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-advertiser-external) |\n", + "| Third Party | `ThirdPartyContext` | `POST /data/thirdparty` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-thirdparty) |\n", + "| Offline Conversion | `OfflineConversionContext` | `POST /providerapi/offlineconversion` | [OpenTTD](https://open.thetradedesk.com/advertiser/docsApp/GuidesAdvertiser/data/doc/post-providerapi-offlineconversion) |\n", + "| Deletion / Opt-Out — Advertiser | `DeletionOptOutAdvertiserContext` | `POST /data/deletion-optout/advertiser` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-deletion-optout-advertiser-external) |\n", + "| Deletion / Opt-Out — Third Party | `DeletionOptOutThirdPartyContext` | `POST /data/deletion-optout/thirdparty` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/audience/doc/post-data-deletion-optout-thirdparty) |\n", + "| Deletion / Opt-Out — Merchant | `DeletionOptOutMerchantContext` | `POST /data/deletion-optout/merchant` | [OpenTTD](https://open.thetradedesk.com/provider/docsApp/GuidesProvider/retail/doc/post-data-deletion-optout-merchant) |" ] }, { @@ -120,7 +132,9 @@ "source": [ "input_schema = get_ttd_input_schema(TTDEndpoint.ADVERTISER)\n", "print(\"Required input schema:\")\n", - "input_schema.printTreeString()" + "\n", + "for field in input_schema.fields:\n", + " print(f\" {field.name}: {field.dataType.simpleString()} (nullable={field.nullable})\")" ] }, { @@ -142,11 +156,15 @@ "# Example: create a small sample DataFrame\n", "# In practice, read from a Delta table or other data source\n", "sample_data = [\n", - " (\"tdid-001\", \"seg-001\"),\n", - " (\"tdid-002\", \"seg-002\"),\n", - " (\"tdid-003\", \"seg-001\"),\n", + " {\"id_type\": \"tdid\", \"id_value\": \"a3f1c2d4-8e7b-4f6a-9c0d-1b2e3f4a5b6c\", \"segment_name\": \"segment_1\"},\n", + " {\"id_type\": \"daid\", \"id_value\": \"7d9e0f1a-2b3c-4d5e-6f7a-8b9c0d1e2f3a\", \"segment_name\": \"segment_2\"},\n", + " # intentionally incorrect format for ramp_id to showcase error enrties in output\n", + " {\"id_type\": \"ramp_id\", \"id_value\": \"c4d5e6f7-a8b9-4c0d-1e2f-3a4b5c6d7e8f\", \"segment_name\": \"segment_3\"},\n", + " {\"id_type\": \"tdid\", \"id_value\": \"1f2a3b4c-5d6e-4f7a-8b9c-0d1e2f3a4b5c\", \"segment_name\": \"segment_4\"},\n", + " {\"id_type\": \"daid\", \"id_value\": \"9b0c1d2e-3f4a-4b5c-6d7e-8f9a0b1c2d3e\", \"segment_name\": \"segment_5\"},\n", "]\n", - "input_df = spark.createDataFrame(sample_data, schema=input_schema)\n", + "\n", + "input_df = spark.createDataFrame(sample_data)\n", "display(input_df)" ] }, @@ -173,7 +191,7 @@ " result_df = client.push_data(\n", " df=input_df,\n", " context=context,\n", - " batch_size=1600,\n", + " batch_size=1600, # Number of rows batched together in a single request to The Trade Desk\n", " )\n", " display(result_df)\n", "except TTDSchemaValidationError as e:\n", @@ -226,16 +244,45 @@ "metadata": {}, "outputs": [], "source": [ - "# Create tables with default names and managed storage\n", + "# Creates three managed Delta tables in the active catalog/database.\n", + "# Default names: ttd_advertiser_input, ttd_advertiser_output, ttd_metadata\n", + "# Pass table_name= and location= to use custom names or external storage.\n", "input_table = client.setup_input_table(endpoint=TTDEndpoint.ADVERTISER)\n", "output_table = client.setup_output_table(endpoint=TTDEndpoint.ADVERTISER)\n", - "metadata_table = client.setup_metadata_table()\n", + "metadata_table = client.setup_metadata_table(table_name=\"ttd_advertiser_metadata\")\n", "\n", "print(f\"Input table: {input_table}\")\n", "print(f\"Output table: {output_table}\")\n", "print(f\"Metadata table: {metadata_table}\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import functions as F\n", + "\n", + "# This cell simulates your upstream pipeline writing records to the input table\n", + "# The user is responsible to write into the input table, the SDK only performs reads from the table\n", + "\n", + "\n", + "# updated_at is required for incremental processing: batch_process uses it\n", + "# to filter rows added since the last run when process_new_records_only=True\n", + "# The user is responsible to set the updated_at value for entries in the input table\n", + "\n", + "(\n", + " spark.createDataFrame(sample_data)\n", + " .withColumn(\"updated_at\", F.current_timestamp())\n", + " .write.format(\"delta\")\n", + " .mode(\"append\")\n", + " .saveAsTable(input_table)\n", + ")\n", + "\n", + "display(spark.table(input_table))" + ] + }, { "cell_type": "code", "execution_count": null, @@ -243,16 +290,32 @@ "outputs": [], "source": [ "# Run batch processing (reads from input_table, writes to output_table)\n", + "\n", + "# process_new_records_only=True filters to rows where updated_at > last run date\n", + "# On the first run, metadata_table is empty so all rows are processed\n", "client.batch_process(\n", " context=context,\n", " input_table=input_table,\n", " output_table=output_table,\n", " metadata_table=metadata_table,\n", - " process_new_records_only=True, # incremental: only rows newer than last run\n", - " batch_size=1600,\n", - " parallelism=16,\n", + " process_new_records_only=True, # Processes rows updated after last run; processes all rows on first run\n", + " batch_size=1600, # Number of rows grouped together in a single request to The Trade Desk\n", + " parallelism=16, # Number of paralellel workers processing the entries from the input table\n", ")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Display the output table\n", + "display(spark.table(output_table))\n", + "\n", + "# Display the metadata table\n", + "display(spark.table(metadata_table))" + ] } ], "metadata": { diff --git a/pyproject.toml b/pyproject.toml index 90a46ee..9aff10d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,11 @@ ignore = [ [tool.ruff.lint.isort] known-first-party = ["ttd_databricks_python"] +[tool.pytest.ini_options] +markers = [ + "spark: marks tests that require a running SparkSession (deselect with '-m \"not spark\"')", +] + [tool.ruff.format] quote-style = "double" indent-style = "space" diff --git a/tests/unit/test_batch_process_early_exit.py b/tests/unit/test_batch_process_early_exit.py index 3e8d025..04c1085 100644 --- a/tests/unit/test_batch_process_early_exit.py +++ b/tests/unit/test_batch_process_early_exit.py @@ -19,6 +19,8 @@ from ttd_databricks_python.ttd_databricks.exceptions import TTDConfigurationError from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient +pytestmark = pytest.mark.spark + _CONTEXT = AdvertiserContext(advertiser_id="adv123") _REQUIRED_SCHEMA = StructType( [ diff --git a/tests/unit/test_client_helpers.py b/tests/unit/test_client_helpers.py index d444f6e..30200d4 100644 --- a/tests/unit/test_client_helpers.py +++ b/tests/unit/test_client_helpers.py @@ -13,6 +13,8 @@ from ttd_databricks_python.ttd_databricks.schemas import get_metadata_schema from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient +pytestmark = pytest.mark.spark + def _make_client(**kwargs) -> TtdDatabricksClient: # type: ignore[no-untyped-def] return TtdDatabricksClient( diff --git a/tests/unit/test_push_data.py b/tests/unit/test_push_data.py index 2a1f0d5..99858d8 100644 --- a/tests/unit/test_push_data.py +++ b/tests/unit/test_push_data.py @@ -16,6 +16,8 @@ from ttd_databricks_python.ttd_databricks.exceptions import TTDSchemaValidationError from ttd_databricks_python.ttd_databricks.ttd_client import TtdDatabricksClient +pytestmark = pytest.mark.spark + _CONTEXT = AdvertiserContext(advertiser_id="adv123") _REQUIRED_SCHEMA = StructType( [