Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
600 changes: 597 additions & 3 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ include-groups = [
[tool.poetry.group.dev.dependencies]
commitizen = "4.9.1"
pre-commit = "4.3.0"
ipykernel = "^7.2.0"

[tool.poetry.group.test]
optional = true
Expand Down
5 changes: 1 addition & 4 deletions src/dve/core_engine/backends/base/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def apply(
return entities, get_parent(processing_errors_uri), successful

for entity_name, entity in entities.items():
entities[entity_name] = self.step_implementations.add_row_id(entity)
entities[entity_name] = self.step_implementations.add_record_index(entity)

# TODO: Handle entity manager creation errors.
entity_manager = EntityManager(entities, reference_data)
Expand All @@ -172,9 +172,6 @@ def apply(
# TODO: and return uri to errors
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)

for entity_name, entity in entity_manager.entities.items():
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)

return entity_manager.entities, get_parent(dc_feedback_errors_uri), True

def process(
Expand Down
12 changes: 10 additions & 2 deletions src/dve/core_engine/backends/base/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ def read_raw_entities(
successful = True
for entity_name, resource in entity_locations.items():
reader_metadata = contract_metadata.reader_metadata[entity_name]
extension = "." + (
get_file_suffix(resource) or ""
extension = (
"." + (get_file_suffix(resource) or "").lower()
) # Already checked that extension supported.

reader_config = reader_metadata[extension]
Expand Down Expand Up @@ -369,6 +369,14 @@ def read_raw_entities(

return entities, dedup_messages(messages), successful

def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Add a record index to the entity"""
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")

def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Drop a record index from the entity"""
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")

@abstractmethod
def apply_data_contract(
self,
Expand Down
8 changes: 8 additions & 0 deletions src/dve/core_engine/backends/base/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ def read_to_entity_type(

return reader_func(self, resource, entity_name, schema)

def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Add a record index to the entity"""
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")

def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Drop a record index to the entity"""
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")

def write_parquet(
self,
entity: EntityType,
Expand Down
14 changes: 6 additions & 8 deletions src/dve/core_engine/backends/base/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,13 @@ def register_udfs(cls, **kwargs):
"""Method to register all custom dve functions for use during business rules application"""
raise NotImplementedError()

@staticmethod
def add_row_id(entity: EntityType) -> EntityType:
"""Add a unique row id field to an entity"""
raise NotImplementedError()
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Add a record index to the entity"""
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")

@staticmethod
def drop_row_id(entity: EntityType) -> EntityType:
"""Add a unique row id field to an entity"""
raise NotImplementedError()
def drop_record_index(self, entity: EntityType) -> EntityType:
"""Drop a unique row id field to an entity"""
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")

@classmethod
def _raise_notimplemented_error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
duckdb_read_parquet,
duckdb_record_index,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
relation_is_empty,
Expand All @@ -37,6 +38,7 @@
from dve.core_engine.backends.metadata.contract import DataContractMetadata
from dve.core_engine.backends.types import StageSuccessful
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityLocations
from dve.core_engine.validation import RowValidator, apply_row_validator_helper
Expand All @@ -54,6 +56,7 @@
return row # no op


@duckdb_record_index
@duckdb_write_parquet
@duckdb_read_parquet
class DuckDBDataContract(BaseDataContract[DuckDBPyRelation]):
Expand Down Expand Up @@ -144,10 +147,12 @@
fld.name: get_duckdb_type_from_annotation(fld.annotation)
for fld in entity_fields.values()
}
ddb_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
polars_schema: dict[str, PolarsType] = {
fld.name: get_polars_type_from_annotation(fld.annotation)
for fld in entity_fields.values()
}
polars_schema[RECORD_INDEX_COLUMN_NAME] = get_polars_type_from_annotation(int)
if relation_is_empty(relation):
self.logger.warning(f"+ Empty relation for {entity_name}")
empty_df = pl.DataFrame([], schema=polars_schema) # type: ignore # pylint: disable=W0612
Expand All @@ -170,6 +175,9 @@

self.logger.info(f"Data contract found {msg_count} issues in {entity_name}")

if not RECORD_INDEX_COLUMN_NAME in relation.columns:

Check warning on line 178 in src/dve/core_engine/backends/implementations/duckdb/contract.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use the opposite operator ("not in") instead.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZzEIDBR2vaqiWb_TC_t&open=AZzEIDBR2vaqiWb_TC_t&pullRequest=57
relation = self.add_record_index(relation)

casting_statements = [
(
self.generate_ddb_cast_statement(column, dtype)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@

import duckdb.typing as ddbtyp
import numpy as np
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression
from duckdb.typing import DuckDBPyType
from pandas import DataFrame
from pydantic import BaseModel
from typing_extensions import Annotated, get_args, get_origin, get_type_hints

from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI
from dve.parser.file_handling.service import LocalFilesystemImplementation, _get_implementation

Expand Down Expand Up @@ -286,3 +287,29 @@ def duckdb_rel_to_dictionaries(
cols: tuple[str] = tuple(entity.columns) # type: ignore
while rows := entity.fetchmany(batch_size):
yield from (dict(zip(cols, rw)) for rw in rows)


def _add_duckdb_record_index(
self, entity: DuckDBPyRelation # pylint: disable=W0613
) -> DuckDBPyRelation:
"""Add record index to duckdb relation"""
if RECORD_INDEX_COLUMN_NAME in entity.columns:
return entity

return entity.select(f"*, row_number() OVER () as {RECORD_INDEX_COLUMN_NAME}")


def _drop_duckdb_record_index(
self, entity: DuckDBPyRelation # pylint: disable=W0613
) -> DuckDBPyRelation:
"""Drop record index from duckdb relation"""
if RECORD_INDEX_COLUMN_NAME not in entity.columns:
return entity
return entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME]))


def duckdb_record_index(cls):
"""Class decorator to add record index methods for duckdb implementations"""
setattr(cls, "add_record_index", _add_duckdb_record_index)
setattr(cls, "drop_record_index", _drop_duckdb_record_index)
return cls
30 changes: 23 additions & 7 deletions src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,32 @@

import duckdb as ddb
import polars as pl
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv
from duckdb import (
DuckDBPyConnection,
DuckDBPyRelation,
StarExpression,
default_connection,
read_csv,
)
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
duckdb_record_index,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.types import SQLType
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, polars_record_index
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length


@duckdb_record_index
@duckdb_write_parquet
class DuckDBCSVReader(BaseFileReader):
"""A reader for CSV files including the ability to compare the passed model
Expand Down Expand Up @@ -109,9 +118,10 @@
}

reader_options["columns"] = ddb_schema
return read_csv(resource, **reader_options)
return self.add_record_index(read_csv(resource, **reader_options, parallel=False))


@polars_record_index
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
"""
Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.
Expand Down Expand Up @@ -145,7 +155,11 @@

# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
# redundant
df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612
df = self.add_record_index( # pylint: disable=W0612

Check warning on line 158 in src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused local variable "df".

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZzEIC-V2vaqiWb_TC_s&open=AZzEIC-V2vaqiWb_TC_s&pullRequest=57
pl.scan_csv(resource, **reader_options).select( # type: ignore
list(polars_types.keys())
)
)

return ddb.sql("SELECT * FROM df")

Expand Down Expand Up @@ -189,8 +203,10 @@
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
) -> DuckDBPyRelation:
entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
entity = entity.distinct()
entity: DuckDBPyRelation = super().read_to_relation(
resource=resource, entity_name=entity_name, schema=schema
)
entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct()
no_records = entity.shape[0]

if no_records != 1:
Expand Down Expand Up @@ -219,4 +235,4 @@
],
)

return entity
return entity.select(f"*, 1 as {RECORD_INDEX_COLUMN_NAME}")
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
duckdb_record_index,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.types import SQLType
from dve.core_engine.type_hints import URI, EntityName


@duckdb_record_index
@duckdb_write_parquet
class DuckDBJSONReader(BaseFileReader):
"""A reader for JSON files"""
Expand Down Expand Up @@ -47,4 +49,6 @@ def read_to_relation( # pylint: disable=unused-argument
for fld in schema.__fields__.values()
}

return read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
return self.add_record_index(
read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
)
13 changes: 10 additions & 3 deletions src/dve/core_engine/backends/implementations/duckdb/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.backends.utilities import (
get_polars_type_from_annotation,
polars_record_index,
stringify_model,
)
from dve.core_engine.type_hints import URI


@polars_record_index
@duckdb_write_parquet
class DuckDBXMLStreamReader(XMLStreamReader):
"""A reader for XML files"""
Expand All @@ -39,7 +44,9 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
for fld in stringify_model(schema).__fields__.values()
}

_lazy_frame = pl.LazyFrame(
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
_lazy_frame = self.add_record_index(
pl.LazyFrame(
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
)
)
return self.ddb_connection.sql("select * from _lazy_frame")
17 changes: 2 additions & 15 deletions src/dve/core_engine/backends/implementations/duckdb/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
DDBStruct,
duckdb_read_parquet,
duckdb_record_index,
duckdb_rel_to_dictionaries,
duckdb_write_parquet,
get_all_registered_udfs,
Expand Down Expand Up @@ -51,13 +52,13 @@
SemiJoin,
TableUnion,
)
from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.functions import implementations as functions
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import Messages


@duckdb_record_index
@duckdb_write_parquet
@duckdb_read_parquet
class DuckDBStepImplementations(BaseStepImplementations[DuckDBPyRelation]):
Expand Down Expand Up @@ -106,20 +107,6 @@ def register_udfs( # type: ignore
connection.sql(_sql)
return cls(connection=connection, **kwargs)

@staticmethod
def add_row_id(entity: DuckDBPyRelation) -> DuckDBPyRelation:
"""Adds a row identifier to the Relation"""
if ROWID_COLUMN_NAME not in entity.columns:
entity = entity.project(f"*, ROW_NUMBER() OVER () as {ROWID_COLUMN_NAME}")
return entity

@staticmethod
def drop_row_id(entity: DuckDBPyRelation) -> DuckDBPyRelation:
"""Drops the row identiifer from a Relation"""
if ROWID_COLUMN_NAME in entity.columns:
entity = entity.select(StarExpression(exclude=[ROWID_COLUMN_NAME]))
return entity

def add(self, entities: DuckDBEntities, *, config: ColumnAddition) -> Messages:
"""A transformation step which adds a column to an entity."""
entity: DuckDBPyRelation = entities[config.entity_name]
Expand Down
4 changes: 2 additions & 2 deletions src/dve/core_engine/backends/implementations/spark/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dve.core_engine.backends.implementations.spark.rules import SparkStepImplementations
from dve.core_engine.backends.implementations.spark.spark_helpers import get_type_from_annotation
from dve.core_engine.backends.implementations.spark.types import SparkEntities
from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.loggers import get_child_logger, get_logger
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, EntityParquetLocations
Expand Down Expand Up @@ -58,7 +58,7 @@ def write_entities_to_parquet(
locations = {}
self.logger.info(f"Writing entities to the output location: {cache_prefix}")
for entity_name, entity in entities.items():
entity = entity.drop(ROWID_COLUMN_NAME)
entity = entity.drop(RECORD_INDEX_COLUMN_NAME)

self.logger.info(f"Entity: {entity_name}")

Expand Down
Loading