Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions data-tool/flows/colin_freeze_flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import math
from prefect import flow, task
from common.init_utils import colin_extract_init, colin_oracle_init, get_config
from common.colin_utils import colin_oracle_chunks
from sqlalchemy import Engine, text

from prefect.cache_policies import NO_CACHE
Expand Down Expand Up @@ -131,37 +132,40 @@


@task(cache_policy=NO_CACHE)
def update_colin_oracle(config, colin_oracle_engine: Engine, corp_num: str):
def update_colin_oracle(config, colin_oracle_engine: Engine, corp_nums: list[str]):
results = []
with colin_oracle_engine.connect() as conn:
transaction = conn.begin()
try:
res1, res2 = None, None
colin_corp_num = convert_to_colin_format(corp_num)
if config.FREEZE_COLIN_CORPS:
res1 = conn.execute(
text(colin_freeze_query),
{'corp_num': colin_corp_num}
)
if config.FREEZE_ADD_EARLY_ADOPTER:
res2 = conn.execute(
text(colin_add_early_adopters_query),
{'corp_num': colin_corp_num}
)
frozen = res1.rowcount > 0 if res1 else False
in_early_adopter = res2.rowcount > 0 if res2 else False
for corp_num in corp_nums:
res1, res2 = None, None
colin_corp_num = convert_to_colin_format(corp_num)
if config.FREEZE_COLIN_CORPS:
res1 = conn.execute(
text(colin_freeze_query),
{'corp_num': colin_corp_num}
)
if config.FREEZE_ADD_EARLY_ADOPTER:
res2 = conn.execute(
text(colin_add_early_adopters_query),
{'corp_num': colin_corp_num}
)
frozen = res1.rowcount > 0 if res1 else False
in_early_adopter = res2.rowcount > 0 if res2 else False
results.append((corp_num, frozen, in_early_adopter, None))
transaction.commit()
return corp_num, frozen, in_early_adopter, None
return results
except Exception as e:
transaction.rollback()
print(f'❌ Error updating {corp_num} in colin: {repr(e)}')
return corp_num, False, False, e
print(f'❌ Error updating colin batch: {repr(e)}')
return [(corp_num, False, False, e) for corp_num in corp_nums]


@flow(
name='Colin-Freeze-Flow',
log_prints=True,
)
def colin_freeze_flow():

Check failure on line 168 in data-tool/flows/colin_freeze_flow.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ301-Jy87P6yfrIkQvL&open=AZ301-Jy87P6yfrIkQvL&pullRequest=4355
try:
config = get_config()
colin_oracle_engine = colin_oracle_init(config)
Expand Down Expand Up @@ -205,35 +209,35 @@
print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...')

futures = []
for corp_num in corp_nums:
for corp_chunk in colin_oracle_chunks(corp_nums):
futures.append(
update_colin_oracle.submit(
config, colin_oracle_engine, corp_num)
config, colin_oracle_engine, corp_chunk)
)
complete = 0
failed = 0
for f in futures:
corp_num, frozen, in_early_adopter, error = f.result()
if error:
failed += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.FAILED,
repr(error),
frozen=frozen,
in_early_adopter=in_early_adopter
)
else:
complete += 1
colin_tracking_service.update_corp_status(
for corp_num, frozen, in_early_adopter, error in f.result():
if error:
failed += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED,
error=None,
ProcessingStatuses.FAILED,
repr(error),
frozen=frozen,
in_early_adopter=in_early_adopter
)
)
else:
complete += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED,
error=None,
frozen=frozen,
in_early_adopter=in_early_adopter
)

total_failed += failed
cnt += 1
Expand Down
3 changes: 3 additions & 0 deletions data-tool/flows/common/colin_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def colin_oracle_chunks(items: list[str], size: int = 999):
for i in range(0, len(items), size):
yield items[i:1 + size]