Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 133 additions & 48 deletions data-tool/flows/colin_freeze_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import ExtractTrackingService as ColinTrackingService, ProcessingStatuses

FLOW_NAME = 'colin-freeze-flow'
ORACLE_IN_LIMIT = 1000
DEFAULT_ORACLE_CHUNK_SIZE = 1000


colin_freeze_query = """
Expand Down Expand Up @@ -61,14 +63,14 @@
def get_unprocessed_corps_query(flow_name, config, batch_size):
environment = config.DATA_LOAD_ENV
use_mig_filter = config.USE_MIGRATION_FILTER
mig_group_ids = config.MIG_GROUP_IDS
mig_batch_ids = config.MIG_BATCH_IDS
mig_group_ids = config.MIG_GROUP_IDS
mig_batch_ids = config.MIG_BATCH_IDS

cte_clause, where_clause = get_onboarding_group_subquery()

if use_mig_filter:
mig_select = "b.id AS mig_batch_id,"
mig_join = """
mig_join = """
JOIN mig_corp_batch mcb ON mcb.corp_num = c.corp_num
JOIN mig_batch b ON b.id = mcb.mig_batch_id
JOIN mig_group g ON g.id = b.mig_group_id
Expand All @@ -80,8 +82,8 @@
mig_extra += f" AND g.id IN ({mig_group_ids})"
else:
mig_select = "NULL::integer AS mig_batch_id,"
mig_join = ""
mig_extra = ""
mig_join = ""
mig_extra = ""

query = f"""
{cte_clause}
Expand Down Expand Up @@ -130,31 +132,113 @@
return corp_num


def chunk_list(values: list[str], size: int) -> list[list[str]]:
return [values[i:i + size] for i in range(0, len(values), size)]


def build_in_bind_clause(values: list[str], prefix: str = 'corp') -> tuple[str, dict]:
bind_names = [f'{prefix}_{i}' for i in range(len(values))]
clause = f"({', '.join(f':{name}' for name in bind_names)})"
params = {name: value for name, value in zip(bind_names, values)}

Check warning on line 142 in data-tool/flows/colin_freeze_flow.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this comprehension with passing the iterable to the dict constructor call

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ36FVSVcmjwdG2lpvd3&open=AZ36FVSVcmjwdG2lpvd3&pullRequest=4356
return clause, params


@task(cache_policy=NO_CACHE)
def update_colin_oracle(config, colin_oracle_engine: Engine, corp_num: str):
def update_colin_oracle_chunk(config, colin_oracle_engine: Engine, corp_nums: list[str]):
if not corp_nums:
return []

if len(corp_nums) > ORACLE_IN_LIMIT:
error = ValueError(f'Chunk size {len(corp_nums)} exceeds ORACLE_IN_LIMIT {ORACLE_IN_LIMIT}')
return [(corp_num, False, False, error) for corp_num in corp_nums]

if not config.FREEZE_COLIN_CORPS and not config.FREEZE_ADD_EARLY_ADOPTER:
return [(corp_num, False, False, None) for corp_num in corp_nums]

colin_corp_nums = [convert_to_colin_format(corp_num) for corp_num in corp_nums]
in_clause, in_params = build_in_bind_clause(colin_corp_nums)

with colin_oracle_engine.connect() as conn:
transaction = conn.begin()
frozen_colin_nums = set()
try:
res1, res2 = None, None
colin_corp_num = convert_to_colin_format(corp_num)
if config.FREEZE_COLIN_CORPS:
res1 = conn.execute(
text(colin_freeze_query),
{'corp_num': colin_corp_num}
)
select_query = f"""
SELECT c.corp_num
FROM corporation c
WHERE c.corp_num IN {in_clause}
"""
rows = conn.execute(text(select_query), in_params).fetchall()
frozen_colin_nums = {row[0] for row in rows}

freeze_query = f"""
UPDATE corporation c
SET corp_frozen_typ_cd = 'C'
WHERE c.corp_num IN {in_clause}
"""
conn.execute(text(freeze_query), in_params)

if config.FREEZE_ADD_EARLY_ADOPTER:
res2 = conn.execute(
text(colin_add_early_adopters_query),
{'corp_num': colin_corp_num}
early_adopter_params = [{'corp_num': corp_num} for corp_num in colin_corp_nums]
conn.execute(text(colin_add_early_adopters_query), early_adopter_params)
except Exception as e:
print(f'❌ Chunk statement error for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(e)}')
try:
transaction.commit()
print(f'⚠️ Chunk statement-error-path commit succeeded for {len(corp_nums)} corps ({corp_nums[:5]})')
except Exception as commit_error:
print(f'❌ Chunk statement-error-path commit failed for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(commit_error)}')
return [
(
corp_num,
convert_to_colin_format(corp_num) in frozen_colin_nums if config.FREEZE_COLIN_CORPS else False,
False,
e,
)
frozen = res1.rowcount > 0 if res1 else False
in_early_adopter = res2.rowcount > 0 if res2 else False
for corp_num in corp_nums
]

try:
transaction.commit()
return corp_num, frozen, in_early_adopter, None
except Exception as e:
transaction.rollback()
print(f'❌ Error updating {corp_num} in colin: {repr(e)}')
return corp_num, False, False, e
print(f'❌ Chunk commit error for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(e)}')
return [(corp_num, False, False, e) for corp_num in corp_nums]

results = []
for original_corp_num, colin_corp_num in zip(corp_nums, colin_corp_nums):
frozen = colin_corp_num in frozen_colin_nums if config.FREEZE_COLIN_CORPS else False
in_early_adopter = config.FREEZE_ADD_EARLY_ADOPTER
results.append((original_corp_num, frozen, in_early_adopter, None))
return results


def record_oracle_results(colin_tracking_service, flow_run_id: str, results: list[tuple]):
complete = 0
failed = 0

for corp_num, frozen, in_early_adopter, error in results:
if error:
failed += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.FAILED,
repr(error),
frozen=frozen,
in_early_adopter=in_early_adopter
)
else:
complete += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED,
error=None,
frozen=frozen,
in_early_adopter=in_early_adopter
)

return complete, failed


@flow(
Expand All @@ -174,8 +258,13 @@
raise ValueError('FREEZE_BATCHES must be explicitly set to a positive integer')
if config.FREEZE_BATCH_SIZE <= 0:
raise ValueError('FREEZE_BATCH_SIZE must be explicitly set to a positive integer')

oracle_chunk_size = getattr(config, 'FREEZE_ORACLE_CHUNK_SIZE', DEFAULT_ORACLE_CHUNK_SIZE)
if oracle_chunk_size < 1 or oracle_chunk_size > ORACLE_IN_LIMIT:
raise ValueError(f'FREEZE_ORACLE_CHUNK_SIZE must be between 1 and {ORACLE_IN_LIMIT}')

batch_size = config.FREEZE_BATCH_SIZE
batches = min(math.ceil(total/batch_size), config.FREEZE_BATCHES)
batches = min(math.ceil(total / batch_size), config.FREEZE_BATCHES)
max_num = min(total, config.FREEZE_BATCHES * config.FREEZE_BATCH_SIZE)

flow_run_id = get_run_context().flow_run.id
Expand All @@ -199,41 +288,37 @@
flow_run_id, batch_size)

if not corp_nums:
print("No more corps available to claim")
print('No more corps available to claim')
break

print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...')
chunks = chunk_list(corp_nums, oracle_chunk_size)
print(
f'👷 Start processing {len(corp_nums)} corps in '
f'{len(chunks)} Oracle chunks of <={oracle_chunk_size}: '
f'{", ".join(corp_nums[:5])}...'
)

futures = []
for corp_num in corp_nums:
for chunk in chunks:
futures.append(
update_colin_oracle.submit(
config, colin_oracle_engine, corp_num)
update_colin_oracle_chunk.submit(
config,
colin_oracle_engine,
chunk,
)
)

complete = 0
failed = 0
for f in futures:
corp_num, frozen, in_early_adopter, error = f.result()
if error:
failed += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.FAILED,
repr(error),
frozen=frozen,
in_early_adopter=in_early_adopter
)
else:
complete += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED,
error=None,
frozen=frozen,
in_early_adopter=in_early_adopter
)
results = f.result()
chunk_complete, chunk_failed = record_oracle_results(
colin_tracking_service,
flow_run_id,
results,
)
complete += chunk_complete
failed += chunk_failed

total_failed += failed
cnt += 1
Expand Down
12 changes: 12 additions & 0 deletions data-tool/flows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ def _get_bool(name: str, default: bool = False) -> bool:
return val.strip().lower() == 'true'


def _get_strict_int(name: str, default: int) -> int:
"""Strict int env parsing: unset uses default, invalid values raise."""
val = os.getenv(name)
if val is None:
return default
try:
return int(val)
except ValueError as exc:
raise ValueError(f'{name} must be a valid integer, got: {val!r}') from exc


def get_named_config(config_name: str = 'production'):
"""Return the configuration object based on the name.

Expand Down Expand Up @@ -179,6 +190,7 @@ class _Config(): # pylint: disable=too-few-public-methods
# freeze flow
FREEZE_BATCHES = _get_int('FREEZE_BATCHES', 0)
FREEZE_BATCH_SIZE = _get_int('FREEZE_BATCH_SIZE', 0)
FREEZE_ORACLE_CHUNK_SIZE = _get_strict_int('FREEZE_ORACLE_CHUNK_SIZE', 1000)

# ORACLE COLIN DB
DB_USER_COLIN_ORACLE = os.getenv('DATABASE_USERNAME_COLIN_ORACLE', '')
Expand Down