Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added
- Added `--schema-name` option to `test` command to test a specific schema instead of all schemas (#1079 @kelsoufi-sanofi)

### Fixed
- Fix Protobuf export for arrays of objects and improve message/enum naming to UpperCamelCase (#1012 @Schokuroff)

Expand Down
5 changes: 5 additions & 0 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ def test(
"servers (default)."
),
] = "all",
schema_name: Annotated[
str,
typer.Option(help="The name of the schema to test, e.g., `orders`, or `all` for all schemas (default)."),
] = "all",
publish_test_results: Annotated[
bool, typer.Option(help="Deprecated. Use publish parameter. Publish the results after the test")
] = False,
Expand Down Expand Up @@ -198,6 +202,7 @@ def test(
publish_test_results=publish_test_results,
publish_url=publish,
server=server,
schema_name=schema_name,
ssl_verification=ssl_verification,
).test()
Comment thread
kelsoufi-sanofi marked this conversation as resolved.
if logs:
Expand Down
6 changes: 5 additions & 1 deletion datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
data_contract: OpenDataContractStandard = None,
schema_location: str = None,
server: str = None,
schema_name: str = "all",
publish_url: str = None,
spark: "SparkSession" = None,
duckdb_connection: "DuckDBPyConnection" = None,
Expand All @@ -40,6 +41,7 @@ def __init__(
self._data_contract = data_contract
self._schema_location = schema_location
self._server = server
self._schema_name = schema_name
self._publish_url = publish_url
self._publish_test_results = publish_test_results
self._spark = spark
Expand Down Expand Up @@ -120,7 +122,9 @@ def test(self) -> Run:
inline_definitions=self._inline_definitions,
)

execute_data_contract_test(data_contract, run, self._server, self._spark, self._duckdb_connection)
execute_data_contract_test(
data_contract, run, self._server, self._spark, self._duckdb_connection, schema_name=self._schema_name
)

except DataContractException as e:
run.checks.append(
Expand Down
4 changes: 3 additions & 1 deletion datacontract/engines/data_contract_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ def _get_schema_custom_property_value(schema: SchemaObject, key: str) -> Optiona
return None


def create_checks(data_contract: OpenDataContractStandard, server: Server) -> List[Check]:
def create_checks(data_contract: OpenDataContractStandard, server: Server, schema_name: str = "all") -> List[Check]:
checks: List[Check] = []
if data_contract.schema_ is None:
return checks
for schema_obj in data_contract.schema_:
if schema_name != "all" and schema_obj.name != schema_name:
continue
schema_checks = to_schema_checks(schema_obj, server)
checks.extend(schema_checks)
checks.extend(to_servicelevel_checks(data_contract))
Expand Down
18 changes: 15 additions & 3 deletions datacontract/engines/data_contract_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def execute_data_contract_test(
server_name: str = None,
spark: "SparkSession" = None,
duckdb_connection: "DuckDBPyConnection" = None,
schema_name: str = "all",
):
if data_contract.schema_ is None or len(data_contract.schema_) == 0:
raise DataContractException(
Expand All @@ -46,16 +47,27 @@ def execute_data_contract_test(
run.outputPortId = None # ODCS doesn't have outputPortId
run.server = server_name

if schema_name != "all":
schema_names = {s.name for s in data_contract.schema_} if data_contract.schema_ else set()
if schema_name not in schema_names:
raise DataContractException(
type="lint",
name="Check that schema name exists",
result=ResultEnum.failed,
reason=f"Schema '{schema_name}' not found in data contract. Available schemas: {sorted(schema_names)}",
engine="datacontract",
)

if server.type == "api":
server = process_api_response(run, server)

run.checks.extend(create_checks(data_contract, server))
run.checks.extend(create_checks(data_contract, server, schema_name=schema_name))

# TODO check server is supported type for nicer error messages
# TODO check server credentials are complete for nicer error messages
if server.format == "json" and server.type != "kafka":
check_jsonschema(run, data_contract, server)
check_soda_execute(run, data_contract, server, spark, duckdb_connection)
check_jsonschema(run, data_contract, server, schema_name=schema_name)
check_soda_execute(run, data_contract, server, spark, duckdb_connection, schema_name=schema_name)


def get_server(data_contract: OpenDataContractStandard, server_name: str = None) -> Server | None:
Expand Down
4 changes: 3 additions & 1 deletion datacontract/engines/fastjsonschema/check_jsonschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def process_s3_file(run, server, schema, model_name, validate):
process_exceptions(run, exceptions)


def check_jsonschema(run: Run, data_contract: OpenDataContractStandard, server: Server):
def check_jsonschema(run: Run, data_contract: OpenDataContractStandard, server: Server, schema_name: str = "all"):
run.log_info("Running engine jsonschema")

# Early exit conditions
Expand All @@ -253,6 +253,8 @@ def check_jsonschema(run: Run, data_contract: OpenDataContractStandard, server:
return

for schema_obj in data_contract.schema_:
if schema_name != "all" and schema_obj.name != schema_name:
continue
model_name = schema_obj.name
# Process the model
run.log_info(f"jsonschema: Converting model {model_name} to JSON Schema")
Expand Down
3 changes: 2 additions & 1 deletion datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def check_soda_execute(
server: Server,
spark: "SparkSession" = None,
duckdb_connection: "DuckDBPyConnection" = None,
schema_name: str = "all",
):
from soda.common.config_helper import ConfigHelper

Expand All @@ -47,7 +48,7 @@ def check_soda_execute(
if server.type in ["s3", "gcs", "azure", "local"]:
if server.format in ["json", "parquet", "csv", "delta"]:
run.log_info(f"Configuring engine soda-core to connect to {server.type} {server.format} with duckdb")
con = get_duckdb_connection(data_contract, server, run, duckdb_connection)
con = get_duckdb_connection(data_contract, server, run, duckdb_connection, schema_name=schema_name)
scan.add_duckdb_connection(duckdb_connection=con, data_source_name=server.type)
scan.set_data_source_name(server.type)
else:
Expand Down
3 changes: 3 additions & 0 deletions datacontract/engines/soda/connections/duckdb_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def get_duckdb_connection(
server: Server,
run: Run,
duckdb_connection: "duckdb.DuckDBPyConnection | None" = None,
schema_name: str = "all",
) -> "duckdb.DuckDBPyConnection":
duckdb = _import_duckdb()
if duckdb_connection is None:
Expand All @@ -49,6 +50,8 @@ def get_duckdb_connection(
if data_contract.schema_:
for schema_obj in data_contract.schema_:
model_name = schema_obj.name
if schema_name != "all" and model_name != schema_name:
continue
model_path = path
if "{model}" in model_path:
model_path = model_path.format(model=model_name)
Expand Down
10 changes: 10 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

from typer.testing import CliRunner

from datacontract.cli import app
Expand All @@ -18,6 +20,14 @@ def test_file_does_not_exist():
assert "The file 'unknown.yaml' does not \nexist." in result.stdout


def test_test_schema_name_option_in_help():
"""Test that --schema-name option is available in test command help."""
result = runner.invoke(app, ["test", "--help"], env={"COLUMNS": "200"})
assert result.exit_code == 0
plain_output = re.sub(r"\x1b\[[0-9;]*m", "", result.stdout)
assert "--schema-name" in plain_output


def test_changelog_help():
result = runner.invoke(app, ["changelog", "--help"])
assert result.exit_code == 0
Expand Down
93 changes: 92 additions & 1 deletion tests/test_data_contract_checks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from open_data_contract_standard.model import DataQuality, Server
from open_data_contract_standard.model import DataQuality, OpenDataContractStandard, Server

from datacontract.data_contract import DataContract
from datacontract.engines.data_contract_checks import (
Expand All @@ -13,6 +13,7 @@
check_property_required,
check_property_type,
check_property_unique,
create_checks,
prepare_query,
to_schema_checks,
)
Expand Down Expand Up @@ -229,6 +230,96 @@ def test_check_property_is_present_duckdb_hyphenated_model_name():
assert schema_check["fail"]["when required column missing"] == ["name"]


def _make_multi_schema_contract() -> OpenDataContractStandard:
"""Create a data contract with two schemas for testing schema_name filtering."""
return OpenDataContractStandard(
**{
"apiVersion": "v3.1.0",
"kind": "DataContract",
"id": "test-schema-filter",
"name": "Test Schema Filter",
"version": "1.0.0",
"status": "active",
"schema": [
{
"name": "orders",
"properties": [
{"name": "order_id", "logicalType": "string", "physicalType": "string", "required": True},
{"name": "amount", "logicalType": "integer", "physicalType": "integer"},
],
},
{
"name": "line_items",
"properties": [
{"name": "line_item_id", "logicalType": "string", "physicalType": "string", "required": True},
{"name": "order_id", "logicalType": "string", "physicalType": "string"},
],
},
],
}
)


def test_create_checks_schema_name_all():
"""Test that schema_name='all' returns checks for all schemas."""
contract = _make_multi_schema_contract()
server = Server(type="postgres")

checks = create_checks(contract, server, schema_name="all")

models_in_checks = {c.model for c in checks if c.model is not None}
assert "orders" in models_in_checks
assert "line_items" in models_in_checks


def test_create_checks_schema_name_default():
"""Test that omitting schema_name returns checks for all schemas (default is 'all')."""
contract = _make_multi_schema_contract()
server = Server(type="postgres")

checks = create_checks(contract, server)

models_in_checks = {c.model for c in checks if c.model is not None}
assert "orders" in models_in_checks
assert "line_items" in models_in_checks


def test_create_checks_schema_name_filter_orders():
"""Test that schema_name='orders' returns only checks for the orders schema."""
contract = _make_multi_schema_contract()
server = Server(type="postgres")

checks = create_checks(contract, server, schema_name="orders")

models_in_checks = {c.model for c in checks if c.model is not None}
assert "orders" in models_in_checks
assert "line_items" not in models_in_checks


def test_create_checks_schema_name_filter_line_items():
"""Test that schema_name='line_items' returns only checks for the line_items schema."""
contract = _make_multi_schema_contract()
server = Server(type="postgres")

checks = create_checks(contract, server, schema_name="line_items")

models_in_checks = {c.model for c in checks if c.model is not None}
assert "line_items" in models_in_checks
assert "orders" not in models_in_checks


def test_create_checks_schema_name_nonexistent():
"""Test that a non-existent schema_name returns no schema checks (only servicelevel)."""
contract = _make_multi_schema_contract()
server = Server(type="postgres")

checks = create_checks(contract, server, schema_name="nonexistent")

models_in_checks = {c.model for c in checks if c.model is not None}
assert "orders" not in models_in_checks
assert "line_items" not in models_in_checks


def test_field_and_model_names_have_backticks_in_quality_bigquery():
"""Test that field and model names are encapsulated with backticks for BigQuery servers in quality checks"""
data_contract = DataContract(data_contract_file="fixtures/bigquery/datacontract_with_quality_rules.odcs.yaml")
Expand Down