Skip to content

Commit 836d627

Browse files
damageboyclaude
andcommitted
feat: add --max-tasks-per-child flag and unify Pool usage for memory control
Replace ProcessPoolExecutor with multiprocessing.Pool in pipelined stage processing to enable maxtasksperchild, which recycles worker processes after N tasks to limit memory growth in long runs. Uses apply_async with a callback queue instead of concurrent.futures wait(FIRST_COMPLETED). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 73906ed commit 836d627

2 files changed

Lines changed: 133 additions & 63 deletions

File tree

vxsort/smallsort/codegen/src/bitonic_compiler.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def _run_verification(
5252
prim_type: primitive_type,
5353
natural_order: bool,
5454
max_workers: int | None = None,
55+
max_tasks_per_child: int | None = 1000,
5556
):
5657
"""Run Z3 end-to-end verification on a list of paths using multiprocessing.
5758
@@ -78,7 +79,7 @@ def _run_verification(
7879

7980
failures = []
8081
try:
81-
with Pool(processes=max_workers) as pool:
82+
with Pool(processes=max_workers, maxtasksperchild=max_tasks_per_child) as pool:
8283
for path_index, result in pool.imap_unordered(_verify_path_worker, jobs):
8384
progress.update(task_id, advance=1, success=1 if result.verified else 0)
8485
if not result.verified:
@@ -103,6 +104,7 @@ def verify_only_from_json(
103104
estimate: bool = False,
104105
nasm_path: str | None = None,
105106
max_workers: int | None = None,
107+
max_tasks_per_child: int | None = 1000,
106108
):
107109
"""Load solutions from a JSON file and verify them without re-running synthesis.
108110
@@ -139,7 +141,12 @@ def verify_only_from_json(
139141
paths = path_selector.select_top_k_paths(bundle.roots, top_k or 10_000)
140142

141143
_run_verification(
142-
paths, vm, prim_type, bundle.natural_order, max_workers=max_workers
144+
paths,
145+
vm,
146+
prim_type,
147+
bundle.natural_order,
148+
max_workers=max_workers,
149+
max_tasks_per_child=max_tasks_per_child,
143150
)
144151

145152
if estimate:
@@ -259,6 +266,7 @@ def generate_bitonic_sorter(
259266
nasm_path: str | None = None,
260267
no_pipeline: bool = False,
261268
max_workers: int | None = None,
269+
max_tasks_per_child: int | None = 1000,
262270
):
263271
"""
264272
Generate bitonic sorter with super-optimized permutation sequences.
@@ -286,6 +294,8 @@ def generate_bitonic_sorter(
286294
estimate: If True, run OSACA performance estimation on selected paths.
287295
nasm_path: Path to nasm binary for assembly verification (used with --estimate).
288296
no_pipeline: If True, disable pipelined stage processing.
297+
max_tasks_per_child: Maximum tasks per worker process before recycling.
298+
Limits memory growth in long runs. None disables recycling.
289299
290300
Returns:
291301
List of SolutionNode trees representing different optimized solutions
@@ -350,6 +360,7 @@ def generate_bitonic_sorter(
350360
resume_data=resume_data,
351361
pipeline=not no_pipeline,
352362
max_workers=max_workers,
363+
max_tasks_per_child=max_tasks_per_child,
353364
)
354365

355366
print(f"Found {len(solutions)} root solutions")
@@ -415,7 +426,12 @@ def generate_bitonic_sorter(
415426
)
416427

417428
_run_verification(
418-
paths_to_verify, vm, prim_type, natural_order, max_workers=max_workers
429+
paths_to_verify,
430+
vm,
431+
prim_type,
432+
natural_order,
433+
max_workers=max_workers,
434+
max_tasks_per_child=max_tasks_per_child,
419435
)
420436

421437
# OSACA performance estimation
@@ -584,6 +600,14 @@ def generate_bitonic_sorter(
584600
help="Maximum number of worker processes for parallel synthesis and verification. "
585601
"Defaults to os.cpu_count(). Use this to limit memory usage on large machines.",
586602
)
603+
parser.add_argument(
604+
"--max-tasks-per-child",
605+
type=int,
606+
default=1000,
607+
metavar="N",
608+
help="Maximum tasks per worker process before recycling (default: 1000). "
609+
"Limits memory growth in long runs. Set to 0 to disable recycling.",
610+
)
587611
parser.add_argument(
588612
"--list-cpus",
589613
action="store_true",
@@ -593,6 +617,9 @@ def generate_bitonic_sorter(
593617

594618
args = parser.parse_args()
595619

620+
# Convert 0 → None (Pool interprets None as "no limit")
621+
max_tasks_per_child = args.max_tasks_per_child or None
622+
596623
# --list-cpus mode: print supported architectures and exit
597624
if args.list_cpus:
598625
from uops_parser import list_available_architectures
@@ -623,6 +650,7 @@ def generate_bitonic_sorter(
623650
estimate=args.estimate,
624651
nasm_path=args.nasm_path,
625652
max_workers=args.max_workers,
653+
max_tasks_per_child=max_tasks_per_child,
626654
)
627655
raise SystemExit(0)
628656

@@ -668,4 +696,5 @@ def generate_bitonic_sorter(
668696
nasm_path=args.nasm_path,
669697
no_pipeline=args.no_pipeline,
670698
max_workers=args.max_workers,
699+
max_tasks_per_child=max_tasks_per_child,
671700
)

0 commit comments

Comments
 (0)