diff --git a/data-tool/flows/colin_freeze_flow.py b/data-tool/flows/colin_freeze_flow.py index 6b7c28b4f9..5275f5b4ca 100644 --- a/data-tool/flows/colin_freeze_flow.py +++ b/data-tool/flows/colin_freeze_flow.py @@ -1,6 +1,7 @@ import math from prefect import flow, task from common.init_utils import colin_extract_init, colin_oracle_init, get_config +from common.colin_utils import colin_oracle_chunks from sqlalchemy import Engine, text from prefect.cache_policies import NO_CACHE @@ -131,30 +132,33 @@ def convert_to_colin_format(corp_num: str) -> str: @task(cache_policy=NO_CACHE) -def update_colin_oracle(config, colin_oracle_engine: Engine, corp_num: str): +def update_colin_oracle(config, colin_oracle_engine: Engine, corp_nums: list[str]): + results = [] with colin_oracle_engine.connect() as conn: transaction = conn.begin() try: - res1, res2 = None, None - colin_corp_num = convert_to_colin_format(corp_num) - if config.FREEZE_COLIN_CORPS: - res1 = conn.execute( - text(colin_freeze_query), - {'corp_num': colin_corp_num} - ) - if config.FREEZE_ADD_EARLY_ADOPTER: - res2 = conn.execute( - text(colin_add_early_adopters_query), - {'corp_num': colin_corp_num} - ) - frozen = res1.rowcount > 0 if res1 else False - in_early_adopter = res2.rowcount > 0 if res2 else False + for corp_num in corp_nums: + res1, res2 = None, None + colin_corp_num = convert_to_colin_format(corp_num) + if config.FREEZE_COLIN_CORPS: + res1 = conn.execute( + text(colin_freeze_query), + {'corp_num': colin_corp_num} + ) + if config.FREEZE_ADD_EARLY_ADOPTER: + res2 = conn.execute( + text(colin_add_early_adopters_query), + {'corp_num': colin_corp_num} + ) + frozen = res1.rowcount > 0 if res1 else False + in_early_adopter = res2.rowcount > 0 if res2 else False + results.append((corp_num, frozen, in_early_adopter, None)) transaction.commit() - return corp_num, frozen, in_early_adopter, None + return results except Exception as e: transaction.rollback() - print(f'❌ Error updating {corp_num} in colin: {repr(e)}') - return corp_num, False, False, e + print(f'❌ Error updating colin batch: {repr(e)}') + return [(corp_num, False, False, e) for corp_num in corp_nums] @flow( @@ -205,35 +209,35 @@ def colin_freeze_flow(): print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...') futures = [] - for corp_num in corp_nums: + for corp_chunk in colin_oracle_chunks(corp_nums): futures.append( update_colin_oracle.submit( - config, colin_oracle_engine, corp_num) + config, colin_oracle_engine, corp_chunk) ) complete = 0 failed = 0 for f in futures: - corp_num, frozen, in_early_adopter, error = f.result() - if error: - failed += 1 - colin_tracking_service.update_corp_status( - flow_run_id, - corp_num, - ProcessingStatuses.FAILED, - repr(error), - frozen=frozen, - in_early_adopter=in_early_adopter - ) - else: - complete += 1 - colin_tracking_service.update_corp_status( + for corp_num, frozen, in_early_adopter, error in f.result(): + if error: + failed += 1 + colin_tracking_service.update_corp_status( flow_run_id, corp_num, - ProcessingStatuses.COMPLETED, - error=None, + ProcessingStatuses.FAILED, + repr(error), frozen=frozen, in_early_adopter=in_early_adopter - ) + ) + else: + complete += 1 + colin_tracking_service.update_corp_status( + flow_run_id, + corp_num, + ProcessingStatuses.COMPLETED, + error=None, + frozen=frozen, + in_early_adopter=in_early_adopter + ) total_failed += failed cnt += 1 diff --git a/data-tool/flows/common/colin_utils.py b/data-tool/flows/common/colin_utils.py new file mode 100644 index 0000000000..4fb69d2a60 --- /dev/null +++ b/data-tool/flows/common/colin_utils.py @@ -0,0 +1,3 @@ +def colin_oracle_chunks(items: list[str], size: int = 999): + for i in range(0, len(items), size): + yield items[i:1 + size] \ No newline at end of file