Skip to content

Commit d770f6c

Browse files
authored
Merge branch 'datacontract:main' into main
2 parents 05b3c83 + bf47fea commit d770f6c

10 files changed

Lines changed: 288 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
11+
## [0.10.34] - 2025-08-06
12+
1013
### Added
1114

12-
- `datacontract test` now supports testing HTTP APIs.
15+
- `datacontract test` now supports HTTP APIs.
16+
- `datacontract test` now supports Athena.
1317

1418
### Fixed
1519

README.md

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ if not run.has_passed():
161161

162162
Choose the most appropriate installation method for your needs:
163163

164+
### uv
165+
166+
If you have [uv](https://docs.astral.sh/uv/) installed, you can run datacontract-cli directly without installing:
167+
168+
```
169+
uv run --with 'datacontract-cli[all]' datacontract --version
170+
```
171+
164172
### pip
165173
Python 3.10, 3.11, and 3.12 are supported. We recommend to use Python 3.11.
166174

@@ -222,6 +230,7 @@ A list of available extras:
222230

223231
| Dependency | Installation Command |
224232
|-------------------------|--------------------------------------------|
233+
| Amazon Athena | `pip install datacontract-cli[athena]` |
225234
| Avro Support | `pip install datacontract-cli[avro]` |
226235
| Google BigQuery | `pip install datacontract-cli[bigquery]` |
227236
| Databricks Integration | `pip install datacontract-cli[databricks]` |
@@ -366,6 +375,7 @@ Credentials are provided with environment variables.
366375
Supported server types:
367376

368377
- [s3](#S3)
378+
- [athena](#athena)
369379
- [bigquery](#bigquery)
370380
- [azure](#azure)
371381
- [sqlserver](#sqlserver)
@@ -436,6 +446,41 @@ servers:
436446
| `DATACONTRACT_S3_SESSION_TOKEN` | `AQoDYXdzEJr...` | AWS temporary session token (optional) |
437447

438448

449+
#### Athena
450+
451+
Data Contract CLI can test data in AWS Athena stored in S3.
452+
Supports different file formats, such as Iceberg, Parquet, JSON, CSV...
453+
454+
##### Example
455+
456+
datacontract.yaml
457+
```yaml
458+
servers:
459+
athena:
460+
type: athena
461+
catalog: awsdatacatalog # awsdatacatalog is the default setting
462+
schema: icebergdemodb # in Athena, this is called "database"
463+
regionName: eu-central-1
464+
stagingDir: s3://my-bucket/athena-results/
465+
models:
466+
my_table: # corresponds to a table of view name
467+
type: table
468+
fields:
469+
my_column_1: # corresponds to a column
470+
type: string
471+
config:
472+
physicalType: varchar
473+
```
474+
475+
##### Environment Variables
476+
477+
| Environment Variable | Example | Description |
478+
|-------------------------------------|---------------------------------|----------------------------------------|
479+
| `DATACONTRACT_S3_REGION` | `eu-central-1` | Region of Athena service |
480+
| `DATACONTRACT_S3_ACCESS_KEY_ID` | `AKIAXV5Q5QABCDEFGH` | AWS Access Key ID |
481+
| `DATACONTRACT_S3_SECRET_ACCESS_KEY` | `93S7LRrJcqLaaaa/XXXXXXXXXXXXX` | AWS Secret Access Key |
482+
| `DATACONTRACT_S3_SESSION_TOKEN` | `AQoDYXdzEJr...` | AWS temporary session token (optional) |
483+
439484

440485
#### Google Cloud Storage (GCS)
441486

@@ -898,8 +943,10 @@ models:
898943
│ --engine TEXT [engine] The engine used for great │
899944
│ expection run. │
900945
[default: None]
901-
│ --template PATH [custom] The file path of Jinja │
902-
│ template. │
946+
│ --template PATH The file path or URL of a template. │
947+
│ For Excel format: path/URL to custom │
948+
│ Excel template. For custom format: │
949+
│ path to Jinja template. │
903950
[default: None]
904951
│ --help Show this message and exit. │
905952
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯

datacontract/engines/soda/check_soda_execute.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import typing
33
import uuid
44

5+
from datacontract.engines.soda.connections.athena import to_athena_soda_configuration
6+
57
if typing.TYPE_CHECKING:
68
from pyspark.sql import SparkSession
79

@@ -106,6 +108,10 @@ def check_soda_execute(
106108
soda_configuration_str = to_trino_soda_configuration(server)
107109
scan.add_configuration_yaml_str(soda_configuration_str)
108110
scan.set_data_source_name(server.type)
111+
elif server.type == "athena":
112+
soda_configuration_str = to_athena_soda_configuration(server)
113+
scan.add_configuration_yaml_str(soda_configuration_str)
114+
scan.set_data_source_name(server.type)
109115

110116
else:
111117
run.checks.append(
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import os
2+
3+
import yaml
4+
5+
from datacontract.model.exceptions import DataContractException
6+
7+
8+
def to_athena_soda_configuration(server):
9+
s3_region = os.getenv("DATACONTRACT_S3_REGION")
10+
s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID")
11+
s3_secret_access_key = os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY")
12+
s3_session_token = os.getenv("DATACONTRACT_S3_SESSION_TOKEN")
13+
14+
# Validate required parameters
15+
if not s3_access_key_id:
16+
raise DataContractException(
17+
type="athena-connection",
18+
name="missing_access_key_id",
19+
reason="AWS access key ID is required. Set the DATACONTRACT_S3_ACCESS_KEY_ID environment variable.",
20+
engine="datacontract",
21+
)
22+
23+
if not s3_secret_access_key:
24+
raise DataContractException(
25+
type="athena-connection",
26+
name="missing_secret_access_key",
27+
reason="AWS secret access key is required. Set the DATACONTRACT_S3_SECRET_ACCESS_KEY environment variable.",
28+
engine="datacontract",
29+
)
30+
31+
if not hasattr(server, "schema_") or not server.schema_:
32+
raise DataContractException(
33+
type="athena-connection",
34+
name="missing_schema",
35+
reason="Schema is required for Athena connection. Specify the schema where your tables exist in the server configuration.",
36+
engine="datacontract",
37+
)
38+
39+
if not hasattr(server, "stagingDir") or not server.stagingDir:
40+
raise DataContractException(
41+
type="athena-connection",
42+
name="missing_s3_staging_dir",
43+
reason="S3 staging directory is required for Athena connection. This should be the Amazon S3 Query Result Location (e.g., 's3://my-bucket/athena-results/').",
44+
engine="datacontract",
45+
)
46+
47+
# Validate S3 staging directory format
48+
if not server.stagingDir.startswith("s3://"):
49+
raise DataContractException(
50+
type="athena-connection",
51+
name="invalid_s3_staging_dir",
52+
reason=f"S3 staging directory must start with 's3://'. Got: {server.s3_staging_dir}. Example: 's3://my-bucket/athena-results/'",
53+
engine="datacontract",
54+
)
55+
56+
data_source = {
57+
"type": "athena",
58+
"access_key_id": s3_access_key_id,
59+
"secret_access_key": s3_secret_access_key,
60+
"schema": server.schema_,
61+
"staging_dir": server.stagingDir,
62+
}
63+
64+
if s3_region:
65+
data_source["region_name"] = s3_region
66+
elif server.region_name:
67+
data_source["region_name"] = server.region_name
68+
69+
if server.catalog:
70+
# Optional, Identify the name of the Data Source, also referred to as a Catalog. The default value is `awsdatacatalog`.
71+
data_source["catalog"] = server.catalog
72+
73+
if s3_session_token:
74+
data_source["aws_session_token"] = s3_session_token
75+
76+
soda_configuration = {f"data_source {server.type}": data_source}
77+
78+
soda_configuration_str = yaml.dump(soda_configuration)
79+
return soda_configuration_str

datacontract/export/sql_type_converter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44

55
def convert_to_sql_type(field: Field, server_type: str) -> str:
6+
if field.config and "physicalType" in field.config:
7+
return field.config["physicalType"]
8+
69
if server_type == "snowflake":
710
return convert_to_snowflake(field)
811
elif server_type == "postgres":
@@ -19,6 +22,7 @@ def convert_to_sql_type(field: Field, server_type: str) -> str:
1922
return convert_type_to_bigquery(field)
2023
elif server_type == "trino":
2124
return convert_type_to_trino(field)
25+
2226
return field.type
2327

2428

datacontract/imports/odcs_v3_importer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def import_servers(odcs: OpenDataContractStandard) -> Dict[str, Server] | None:
131131
server.host = odcs_server.host
132132
server.port = odcs_server.port
133133
server.catalog = odcs_server.catalog
134+
server.stagingDir = odcs_server.stagingDir
134135
server.topic = getattr(odcs_server, "topic", None)
135136
server.http_path = getattr(odcs_server, "http_path", None)
136137
server.token = getattr(odcs_server, "token", None)

pyproject.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "datacontract-cli"
3-
version = "0.10.33"
3+
version = "0.10.34"
44
description = "The datacontract CLI is an open source command-line tool for working with Data Contracts. It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different formats. The tool is written in Python. It can be used as a standalone CLI tool, in a CI/CD pipeline, or directly as a Python library."
55
license = "MIT"
66
readme = "README.md"
@@ -92,6 +92,10 @@ sqlserver = [
9292
"soda-core-sqlserver>=3.3.20,<3.6.0"
9393
]
9494

95+
athena = [
96+
"soda-core-athena>=3.3.20,<3.6.0"
97+
]
98+
9599
trino = [
96100
"soda-core-trino>=3.3.20,<3.6.0"
97101
]
@@ -122,7 +126,7 @@ protobuf = [
122126
]
123127

124128
all = [
125-
"datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf]"
129+
"datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,iceberg,parquet,rdf,api,protobuf]"
126130
]
127131

128132
# for development, we pin all libraries to an exact version
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
Setup:
2+
3+
# Create an S3 bucket for Iceberg data
4+
5+
6+
```
7+
s3://datacontract-iceberg-demo
8+
```
9+
10+
# Create an S3 bucket for Athena Results
11+
12+
```
13+
s3://entropy-data-demo-athena-results-dfhsiuya
14+
```
15+
16+
# Create a Glue database
17+
18+
In Athena run:
19+
```
20+
CREATE DATABASE icebergdemodb
21+
```
22+
23+
# Create an Iceberg table
24+
In Athena run:
25+
```
26+
CREATE TABLE athena_iceberg_table_partitioned (
27+
color string,
28+
date string,
29+
name string,
30+
price bigint,
31+
product string,
32+
ts timestamp)
33+
PARTITIONED BY (day(ts))
34+
LOCATION 's3://datacontract-iceberg-demo/ice_warehouse/iceberg_db/athena_iceberg_table/'
35+
TBLPROPERTIES (
36+
'table_type' ='ICEBERG'
37+
)
38+
```
39+
40+
# Add some data to the Iceberg table
41+
42+
In Athena run:
43+
```
44+
INSERT INTO "icebergdemodb"."athena_iceberg_table_partitioned" VALUES (
45+
'red', '222022-07-19T03:47:29', 'PersonNew', 178, 'Tuna', now()
46+
)
47+
```
48+
49+
# Add a new IAM user
50+
No permissions needed
51+
52+
E.g. `datacontract-cli-unittests`
53+
54+
# Create an Access Key for this IAM user
55+
56+
Use type `other`
57+
Save them in .env file
58+
```
59+
DATACONTRACT_S3_ACCESS_KEY_ID=AKIA...
60+
DATACONTRACT_S3_SECRET_ACCESS_KEY=...
61+
```
62+
63+
# Give permissions to the IAM user
64+
65+
In Glue ->
66+
https://eu-central-1.console.aws.amazon.com/glue/home?region=eu-central-1#/v2/iam-permissions/select-users
67+
68+
Select the S3 bucket
69+
70+
Create the standard role `AWSGlueServiceRole`
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
apiVersion: v3.0.1
2+
kind: DataContract
3+
id: iceberg-example
4+
name: Iceberg Example
5+
version: 0.0.1
6+
status: active
7+
customProperties:
8+
- property: owner
9+
value: data--ai
10+
description: {}
11+
servers:
12+
- server: athena
13+
type: athena
14+
description: Iceberg files on S3
15+
catalog: awsdatacatalog # awsdatacatalog is the default catalog in Athena
16+
schema: icebergdemodb # called database in Athena
17+
regionName: eu-central-1
18+
stagingDir: s3://entropy-data-demo-athena-results-dfhsiuya/cli
19+
schema:
20+
- name: athena_iceberg_table_partitioned
21+
logicalType: object
22+
properties:
23+
- name: color
24+
logicalType: string
25+
required: true
26+
unique: true
27+
physicalType: varchar
28+
- name: date
29+
logicalType: string
30+
physicalType: varchar
31+
- name: name
32+
logicalType: string
33+
physicalType: varchar
34+
- name: price
35+
logicalType: integer
36+
physicalType: bigint
37+
- name: product
38+
logicalType: string
39+
physicalType: varchar
40+
- name: ts
41+
logicalType: date
42+
physicalType: timestamp(6)

tests/test_test_athena_iceberg.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import logging
2+
import os
3+
4+
import pytest
5+
from dotenv import load_dotenv
6+
7+
from datacontract.data_contract import DataContract
8+
9+
logging.basicConfig(level=logging.INFO, force=True)
10+
load_dotenv(override=True)
11+
datacontract = "fixtures/athena-iceberg/iceberg_example.odcs.yaml"
12+
13+
14+
@pytest.mark.skipif(
15+
os.environ.get("DATACONTRACT_S3_ACCESS_KEY_ID") is None
16+
or os.environ.get("DATACONTRACT_S3_SECRET_ACCESS_KEY") is None,
17+
reason="Requires DATACONTRACT_S3_ACCESS_KEY_ID, and DATACONTRACT_S3_SECRET_ACCESS_KEY to be set",
18+
)
19+
def test_test_athena_iceberg(monkeypatch):
20+
data_contract = DataContract(data_contract_file=datacontract)
21+
22+
run = data_contract.test()
23+
24+
print(run.pretty())
25+
assert run.result == "passed"
26+
assert all(check.result == "passed" for check in run.checks)

0 commit comments

Comments
 (0)