From 06a7fef4074cd9dc17ce10c5edf4b5f068cea571 Mon Sep 17 00:00:00 2001 From: argush3 Date: Tue, 5 May 2026 14:36:28 -0700 Subject: [PATCH] 33027 Update COLIN freeze flow to update in chunks --- data-tool/flows/colin_freeze_flow.py | 181 ++++++++++++++++++++------- data-tool/flows/config.py | 12 ++ 2 files changed, 145 insertions(+), 48 deletions(-) diff --git a/data-tool/flows/colin_freeze_flow.py b/data-tool/flows/colin_freeze_flow.py index 6b7c28b4f9..07f12281e1 100644 --- a/data-tool/flows/colin_freeze_flow.py +++ b/data-tool/flows/colin_freeze_flow.py @@ -11,6 +11,8 @@ import ExtractTrackingService as ColinTrackingService, ProcessingStatuses FLOW_NAME = 'colin-freeze-flow' +ORACLE_IN_LIMIT = 1000 +DEFAULT_ORACLE_CHUNK_SIZE = 1000 colin_freeze_query = """ @@ -61,14 +63,14 @@ def get_onboarding_group_subquery(): def get_unprocessed_corps_query(flow_name, config, batch_size): environment = config.DATA_LOAD_ENV use_mig_filter = config.USE_MIGRATION_FILTER - mig_group_ids = config.MIG_GROUP_IDS - mig_batch_ids = config.MIG_BATCH_IDS + mig_group_ids = config.MIG_GROUP_IDS + mig_batch_ids = config.MIG_BATCH_IDS cte_clause, where_clause = get_onboarding_group_subquery() if use_mig_filter: mig_select = "b.id AS mig_batch_id," - mig_join = """ + mig_join = """ JOIN mig_corp_batch mcb ON mcb.corp_num = c.corp_num JOIN mig_batch b ON b.id = mcb.mig_batch_id JOIN mig_group g ON g.id = b.mig_group_id @@ -80,8 +82,8 @@ def get_unprocessed_corps_query(flow_name, config, batch_size): mig_extra += f" AND g.id IN ({mig_group_ids})" else: mig_select = "NULL::integer AS mig_batch_id," - mig_join = "" - mig_extra = "" + mig_join = "" + mig_extra = "" query = f""" {cte_clause} @@ -130,31 +132,113 @@ def convert_to_colin_format(corp_num: str) -> str: return corp_num +def chunk_list(values: list[str], size: int) -> list[list[str]]: + return [values[i:i + size] for i in range(0, len(values), size)] + + +def build_in_bind_clause(values: list[str], prefix: str = 'corp') -> tuple[str, dict]: + bind_names = [f'{prefix}_{i}' for i in range(len(values))] + clause = f"({', '.join(f':{name}' for name in bind_names)})" + params = {name: value for name, value in zip(bind_names, values)} + return clause, params + + @task(cache_policy=NO_CACHE) -def update_colin_oracle(config, colin_oracle_engine: Engine, corp_num: str): +def update_colin_oracle_chunk(config, colin_oracle_engine: Engine, corp_nums: list[str]): + if not corp_nums: + return [] + + if len(corp_nums) > ORACLE_IN_LIMIT: + error = ValueError(f'Chunk size {len(corp_nums)} exceeds ORACLE_IN_LIMIT {ORACLE_IN_LIMIT}') + return [(corp_num, False, False, error) for corp_num in corp_nums] + + if not config.FREEZE_COLIN_CORPS and not config.FREEZE_ADD_EARLY_ADOPTER: + return [(corp_num, False, False, None) for corp_num in corp_nums] + + colin_corp_nums = [convert_to_colin_format(corp_num) for corp_num in corp_nums] + in_clause, in_params = build_in_bind_clause(colin_corp_nums) + with colin_oracle_engine.connect() as conn: transaction = conn.begin() + frozen_colin_nums = set() try: - res1, res2 = None, None - colin_corp_num = convert_to_colin_format(corp_num) if config.FREEZE_COLIN_CORPS: - res1 = conn.execute( - text(colin_freeze_query), - {'corp_num': colin_corp_num} - ) + select_query = f""" + SELECT c.corp_num + FROM corporation c + WHERE c.corp_num IN {in_clause} + """ + rows = conn.execute(text(select_query), in_params).fetchall() + frozen_colin_nums = {row[0] for row in rows} + + freeze_query = f""" + UPDATE corporation c + SET corp_frozen_typ_cd = 'C' + WHERE c.corp_num IN {in_clause} + """ + conn.execute(text(freeze_query), in_params) + if config.FREEZE_ADD_EARLY_ADOPTER: - res2 = conn.execute( - text(colin_add_early_adopters_query), - {'corp_num': colin_corp_num} + early_adopter_params = [{'corp_num': corp_num} for corp_num in colin_corp_nums] + conn.execute(text(colin_add_early_adopters_query), early_adopter_params) + except Exception as e: + print(f'❌ Chunk statement error for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(e)}') + try: + transaction.commit() + print(f'⚠️ Chunk statement-error-path commit succeeded for {len(corp_nums)} corps ({corp_nums[:5]})') + except Exception as commit_error: + print(f'❌ Chunk statement-error-path commit failed for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(commit_error)}') + return [ + ( + corp_num, + convert_to_colin_format(corp_num) in frozen_colin_nums if config.FREEZE_COLIN_CORPS else False, + False, + e, ) - frozen = res1.rowcount > 0 if res1 else False - in_early_adopter = res2.rowcount > 0 if res2 else False + for corp_num in corp_nums + ] + + try: transaction.commit() - return corp_num, frozen, in_early_adopter, None except Exception as e: - transaction.rollback() - print(f'❌ Error updating {corp_num} in colin: {repr(e)}') - return corp_num, False, False, e + print(f'❌ Chunk commit error for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(e)}') + return [(corp_num, False, False, e) for corp_num in corp_nums] + + results = [] + for original_corp_num, colin_corp_num in zip(corp_nums, colin_corp_nums): + frozen = colin_corp_num in frozen_colin_nums if config.FREEZE_COLIN_CORPS else False + in_early_adopter = config.FREEZE_ADD_EARLY_ADOPTER + results.append((original_corp_num, frozen, in_early_adopter, None)) + return results + + +def record_oracle_results(colin_tracking_service, flow_run_id: str, results: list[tuple]): + complete = 0 + failed = 0 + + for corp_num, frozen, in_early_adopter, error in results: + if error: + failed += 1 + colin_tracking_service.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.FAILED, + repr(error), + frozen=frozen, + in_early_adopter=in_early_adopter + ) + else: + complete += 1 + colin_tracking_service.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.COMPLETED, + error=None, + frozen=frozen, + in_early_adopter=in_early_adopter + ) + + return complete, failed @flow( @@ -174,8 +258,13 @@ def colin_freeze_flow(): raise ValueError('FREEZE_BATCHES must be explicitly set to a positive integer') if config.FREEZE_BATCH_SIZE <= 0: raise ValueError('FREEZE_BATCH_SIZE must be explicitly set to a positive integer') + + oracle_chunk_size = getattr(config, 'FREEZE_ORACLE_CHUNK_SIZE', DEFAULT_ORACLE_CHUNK_SIZE) + if oracle_chunk_size < 1 or oracle_chunk_size > ORACLE_IN_LIMIT: + raise ValueError(f'FREEZE_ORACLE_CHUNK_SIZE must be between 1 and {ORACLE_IN_LIMIT}') + batch_size = config.FREEZE_BATCH_SIZE - batches = min(math.ceil(total/batch_size), config.FREEZE_BATCHES) + batches = min(math.ceil(total / batch_size), config.FREEZE_BATCHES) max_num = min(total, config.FREEZE_BATCHES * config.FREEZE_BATCH_SIZE) flow_run_id = get_run_context().flow_run.id @@ -199,41 +288,37 @@ def colin_freeze_flow(): flow_run_id, batch_size) if not corp_nums: - print("No more corps available to claim") + print('No more corps available to claim') break - print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...') + chunks = chunk_list(corp_nums, oracle_chunk_size) + print( + f'👷 Start processing {len(corp_nums)} corps in ' + f'{len(chunks)} Oracle chunks of <={oracle_chunk_size}: ' + f'{", ".join(corp_nums[:5])}...' + ) futures = [] - for corp_num in corp_nums: + for chunk in chunks: futures.append( - update_colin_oracle.submit( - config, colin_oracle_engine, corp_num) + update_colin_oracle_chunk.submit( + config, + colin_oracle_engine, + chunk, + ) ) + complete = 0 failed = 0 for f in futures: - corp_num, frozen, in_early_adopter, error = f.result() - if error: - failed += 1 - colin_tracking_service.update_corp_status( - flow_run_id, - corp_num, - ProcessingStatuses.FAILED, - repr(error), - frozen=frozen, - in_early_adopter=in_early_adopter - ) - else: - complete += 1 - colin_tracking_service.update_corp_status( - flow_run_id, - corp_num, - ProcessingStatuses.COMPLETED, - error=None, - frozen=frozen, - in_early_adopter=in_early_adopter - ) + results = f.result() + chunk_complete, chunk_failed = record_oracle_results( + colin_tracking_service, + flow_run_id, + results, + ) + complete += chunk_complete + failed += chunk_failed total_failed += failed cnt += 1 diff --git a/data-tool/flows/config.py b/data-tool/flows/config.py index ea4a08952d..590e5783d7 100644 --- a/data-tool/flows/config.py +++ b/data-tool/flows/config.py @@ -43,6 +43,17 @@ def _get_bool(name: str, default: bool = False) -> bool: return val.strip().lower() == 'true' +def _get_strict_int(name: str, default: int) -> int: + """Strict int env parsing: unset uses default, invalid values raise.""" + val = os.getenv(name) + if val is None: + return default + try: + return int(val) + except ValueError as exc: + raise ValueError(f'{name} must be a valid integer, got: {val!r}') from exc + + def get_named_config(config_name: str = 'production'): """Return the configuration object based on the name. @@ -179,6 +190,7 @@ class _Config(): # pylint: disable=too-few-public-methods # freeze flow FREEZE_BATCHES = _get_int('FREEZE_BATCHES', 0) FREEZE_BATCH_SIZE = _get_int('FREEZE_BATCH_SIZE', 0) + FREEZE_ORACLE_CHUNK_SIZE = _get_strict_int('FREEZE_ORACLE_CHUNK_SIZE', 1000) # ORACLE COLIN DB DB_USER_COLIN_ORACLE = os.getenv('DATABASE_USERNAME_COLIN_ORACLE', '')