Skip to content

Commit ab8467a

Browse files
authored
Merge pull request #33 from NHSDigital/bugfix/gr-ndit-850-improve_logging_around_dve_processing_errors
Bugfix/gr ndit 850 improve logging around dve processing errors
2 parents 7110ef6 + 8aa4973 commit ab8467a

12 files changed

Lines changed: 124 additions & 43 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ behave:
1717
${activate} behave
1818

1919
pytest:
20-
${activate} pytest tests/
20+
${activate} pytest -c pytest-dev.ini
2121

2222
all-tests: pytest behave
2323

pytest-dev.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[pytest]
2+
log_cli = true
3+
log_cli_level = INFO

src/dve/core_engine/exceptions.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
"""Exceptions emitted by the pipeline."""
22

3-
from collections.abc import Iterator
3+
import traceback
44
from typing import Optional
55

6-
from dve.core_engine.backends.implementations.spark.types import SparkEntities
7-
from dve.core_engine.message import FeedbackMessage
8-
from dve.core_engine.type_hints import Messages
9-
106

117
class CriticalProcessingError(ValueError):
128
"""An exception emitted if critical errors are received."""
@@ -15,26 +11,18 @@ def __init__(
1511
self,
1612
error_message: str,
1713
*args: object,
18-
messages: Optional[Messages],
19-
entities: Optional[SparkEntities] = None
14+
messages: Optional[list[str]] = None,
2015
) -> None:
2116
super().__init__(error_message, *args)
2217
self.error_message = error_message
2318
"""The error message explaining the critical processing error."""
2419
self.messages = messages
25-
"""The messages gathered at the time the error was emitted."""
26-
self.entities = entities
27-
"""The entities as they exist at the time the error was emitted."""
28-
29-
@property
30-
def critical_messages(self) -> Iterator[FeedbackMessage]:
31-
"""Critical messages which caused the processing error."""
32-
yield from filter(lambda message: message.is_critical, self.messages) # type: ignore
20+
"""The stacktrace for the messages."""
3321

3422
@classmethod
3523
def from_exception(cls, exc: Exception):
3624
"""Create from broader exception, for recording in processing errors"""
37-
return cls(error_message=repr(exc), entities=None, messages=[])
25+
return cls(error_message=repr(exc), messages=traceback.format_exception(exc))
3826

3927

4028
class EntityTypeMismatch(TypeError):

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""DuckDB implementation for `Pipeline` object."""
22

3+
import logging
34
from typing import Optional
45

56
from duckdb import DuckDBPyConnection, DuckDBPyRelation
@@ -30,6 +31,7 @@ def __init__(
3031
submitted_files_path: Optional[URI],
3132
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3233
job_run_id: Optional[int] = None,
34+
logger: Optional[logging.Logger] = None,
3335
):
3436
self._connection = connection
3537
super().__init__(
@@ -41,6 +43,7 @@ def __init__(
4143
submitted_files_path,
4244
reference_data_loader,
4345
job_run_id,
46+
logger,
4447
)
4548

4649
# pylint: disable=arguments-differ

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ def file_transformation(
5757
try:
5858
return super().file_transformation(submission_info)
5959
except Exception as exc: # pylint: disable=W0718
60-
self._logger.error(f"File transformation raised exception: {exc}")
61-
self._logger.exception(exc)
60+
self._logger.exception("File transformation raised exception:")
6261
dump_processing_errors(
6362
fh.joinuri(self.processed_files_path, submission_info.submission_id),
6463
"file_transformation",
@@ -73,8 +72,7 @@ def apply_data_contract(
7372
try:
7473
return super().apply_data_contract(submission_info, submission_status)
7574
except Exception as exc: # pylint: disable=W0718
76-
self._logger.error(f"Apply data contract raised exception: {exc}")
77-
self._logger.exception(exc)
75+
self._logger.exception("Apply data contract raised exception:")
7876
dump_processing_errors(
7977
fh.joinuri(self.processed_files_path, submission_info.submission_id),
8078
"contract",
@@ -89,8 +87,7 @@ def apply_business_rules(
8987
try:
9088
return super().apply_business_rules(submission_info, submission_status)
9189
except Exception as exc: # pylint: disable=W0718
92-
self._logger.error(f"Apply business rules raised exception: {exc}")
93-
self._logger.exception(exc)
90+
self._logger.exception("Apply business rules raised exception:")
9491
dump_processing_errors(
9592
fh.joinuri(self.processed_files_path, submission_info.submission_id),
9693
"business_rules",
@@ -105,8 +102,7 @@ def error_report(
105102
try:
106103
return super().error_report(submission_info, submission_status)
107104
except Exception as exc: # pylint: disable=W0718
108-
self._logger.error(f"Error reports raised exception: {exc}")
109-
self._logger.exception(exc)
105+
self._logger.exception("Error reports raised exception:")
110106
sub_stats = None
111107
report_uri = None
112108
submission_status = submission_status if submission_status else SubmissionStatus()

src/dve/pipeline/pipeline.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long
22
"""Generic Pipeline object to define how DVE should be interacted with."""
33
import json
4+
import logging
45
import re
56
from collections import defaultdict
67
from collections.abc import Generator, Iterable, Iterator
@@ -57,6 +58,7 @@ def __init__(
5758
submitted_files_path: Optional[URI],
5859
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
5960
job_run_id: Optional[int] = None,
61+
logger: Optional[logging.Logger] = None,
6062
):
6163
self._submitted_files_path = submitted_files_path
6264
self._processed_files_path = processed_files_path
@@ -66,11 +68,16 @@ def __init__(
6668
self._audit_tables = audit_tables
6769
self._data_contract = data_contract
6870
self._step_implementations = step_implementations
69-
self._logger = get_logger(__name__)
71+
self._logger = logger or get_logger(__name__)
7072
self._summary_lock = Lock()
7173
self._rec_tracking_lock = Lock()
7274
self._aggregates_lock = Lock()
7375

76+
if self._data_contract:
77+
self._data_contract.logger = self._logger
78+
if self._step_implementations:
79+
self._step_implementations.logger = self._logger
80+
7481
@property
7582
def job_run_id(self) -> Optional[int]:
7683
"""Unique Identifier for the job/process that is running this Pipeline."""
@@ -244,8 +251,7 @@ def audit_received_file_step(
244251
)
245252
continue
246253
except Exception as exc: # pylint: disable=W0703
247-
self._logger.error(f"audit_received_file raised exception: {exc}")
248-
self._logger.exception(exc)
254+
self._logger.exception("audit_received_file raised exception:")
249255
dump_processing_errors(
250256
fh.joinuri(self.processed_files_path, submission_id),
251257
"audit_received",
@@ -301,8 +307,7 @@ def file_transformation(
301307
)
302308

303309
except MessageBearingError as exc:
304-
self._logger.error(f"Unexpected file transformation error: {exc}")
305-
self._logger.exception(exc)
310+
self._logger.exception("Unexpected file transformation error:")
306311
errors.extend(exc.messages)
307312

308313
if errors:
@@ -352,8 +357,7 @@ def file_transformation_step(
352357
)
353358
continue
354359
except Exception as exc: # pylint: disable=W0703
355-
self._logger.error(f"File transformation raised exception: {exc}")
356-
self._logger.exception(exc)
360+
self._logger.exception("File transformation raised exception:")
357361
dump_processing_errors(
358362
fh.joinuri(self.processed_files_path, sub_info.submission_id),
359363
"file_transformation",
@@ -478,8 +482,7 @@ def data_contract_step(
478482
)
479483
continue
480484
except Exception as exc: # pylint: disable=W0703
481-
self._logger.error(f"Data Contract raised exception: {exc}")
482-
self._logger.exception(exc)
485+
self._logger.exception("Data Contract raised exception:")
483486
dump_processing_errors(
484487
fh.joinuri(self.processed_files_path, sub_info.submission_id),
485488
"contract",
@@ -644,8 +647,7 @@ def business_rule_step(
644647
)
645648
continue
646649
except Exception as exc: # pylint: disable=W0703
647-
self._logger.error(f"Business Rules raised exception: {exc}")
648-
self._logger.exception(exc)
650+
self._logger.exception("Business Rules raised exception:")
649651
dump_processing_errors(
650652
fh.joinuri(self.processed_files_path, sub_info.submission_id),
651653
"business_rules",
@@ -704,9 +706,8 @@ def _get_error_dataframes(self, submission_id: str):
704706
errors = None
705707
try:
706708
errors = json.load(f)
707-
except UnicodeDecodeError as exc:
708-
self._logger.error(f"Error reading file: {file}")
709-
self._logger.exception(exc)
709+
except UnicodeDecodeError:
710+
self._logger.exception(f"Error reading file: {file}")
710711
continue
711712
if not errors:
712713
continue
@@ -845,8 +846,7 @@ def error_report_step(
845846
)
846847
continue
847848
except Exception as exc: # pylint: disable=W0703
848-
self._logger.error(f"Error reports raised exception: {exc}")
849-
self._logger.exception(exc)
849+
self._logger.exception("Error reports raised exception:")
850850
dump_processing_errors(
851851
fh.joinuri(self.processed_files_path, sub_info.submission_id),
852852
"error_report",

src/dve/pipeline/spark_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Spark implementation for `Pipeline` object."""
22

3+
import logging
34
from concurrent.futures import Executor
45
from typing import Optional
56

@@ -32,6 +33,7 @@ def __init__(
3233
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3334
spark: Optional[SparkSession] = None,
3435
job_run_id: Optional[int] = None,
36+
logger: Optional[logging.Logger] = None,
3537
):
3638
self._spark = spark if spark else SparkSession.builder.getOrCreate()
3739
super().__init__(
@@ -43,6 +45,7 @@ def __init__(
4345
submitted_files_path,
4446
reference_data_loader,
4547
job_run_id,
48+
logger,
4649
)
4750

4851
# pylint: disable=arguments-differ

src/dve/reporting/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def dump_processing_errors(
6161
if not errors:
6262
raise AttributeError("errors list not passed")
6363

64-
error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json")
64+
error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
6565
processed = []
6666

6767
for error in errors:
@@ -71,6 +71,7 @@ def dump_processing_errors(
7171
"error_location": "processing",
7272
"error_level": "integrity",
7373
"error_message": error.error_message,
74+
"error_traceback": error.messages,
7475
}
7576
)
7677

tests/test_pipeline/test_foundry_ddb_pipeline.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import tempfile
99
from uuid import uuid4
1010

11-
import pytest
11+
import polars as pl
1212

1313
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
1414
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
@@ -116,6 +116,42 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
116116
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
117117
assert not fh.get_resource_exists(report_uri)
118118
assert not output_loc
119+
120+
perror_path = Path(
121+
processing_folder,
122+
sub_info.submission_id,
123+
"processing_errors",
124+
"processing_errors.json"
125+
)
126+
assert perror_path.exists()
127+
perror_schema = {
128+
"step_name": pl.Utf8(),
129+
"error_location": pl.Utf8(),
130+
"error_level": pl.Utf8(),
131+
"error_message": pl.Utf8(),
132+
"error_traceback": pl.List(pl.Utf8()),
133+
}
134+
expected_error_df = (
135+
pl.DataFrame(
136+
[
137+
{
138+
"step_name": "file_transformation",
139+
"error_location": "processing",
140+
"error_level": "integrity",
141+
"error_message": "ReaderLacksEntityTypeSupport()",
142+
"error_traceback": None,
143+
},
144+
],
145+
perror_schema
146+
)
147+
.select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message"))
148+
)
149+
actual_error_df = (
150+
pl.read_json(perror_path, schema=perror_schema)
151+
.select(pl.col("step_name"), pl.col("error_location"), pl.col("error_message"))
152+
)
153+
assert actual_error_df.equals(expected_error_df)
154+
119155
assert len(list(fh.iter_prefix(audit_files))) == 2
120156

121157

0 commit comments

Comments
 (0)