|
| 1 | +"""Demo script for pending/running task status updates.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import argparse |
| 6 | +import os |
| 7 | +import subprocess |
| 8 | +import sys |
| 9 | +from pathlib import Path |
| 10 | + |
| 11 | + |
| 12 | +def _write_tasks(path: Path, n_tasks: int, sleep_s: float, jitter_s: float) -> None: |
| 13 | + task_file = path / "task_status_demo.py" |
| 14 | + lines = [ |
| 15 | + "from __future__ import annotations", |
| 16 | + "", |
| 17 | + "from pathlib import Path", |
| 18 | + "import time", |
| 19 | + "", |
| 20 | + "from pytask import task", |
| 21 | + "", |
| 22 | + f"N_TASKS = {n_tasks}", |
| 23 | + f"SLEEP_S = {sleep_s}", |
| 24 | + f"JITTER_S = {jitter_s}", |
| 25 | + "", |
| 26 | + "for i in range(N_TASKS):", |
| 27 | + " @task(id=str(i), kwargs={'produces': Path(f'out_{i}.txt')})", |
| 28 | + " def task_sleep(produces, i=i):", |
| 29 | + " time.sleep(SLEEP_S + (i % 3) * JITTER_S)", |
| 30 | + " produces.write_text('done')", |
| 31 | + "", |
| 32 | + ] |
| 33 | + task_file.write_text("\n".join(lines), encoding="utf-8") |
| 34 | + |
| 35 | + |
| 36 | +def main() -> int: |
| 37 | + parser = argparse.ArgumentParser( |
| 38 | + description="Run a pytask demo to observe pending/running status updates." |
| 39 | + ) |
| 40 | + parser.add_argument("--n-tasks", type=int, default=30) |
| 41 | + parser.add_argument("--sleep", type=float, default=2.0) |
| 42 | + parser.add_argument("--jitter", type=float, default=0.5) |
| 43 | + parser.add_argument("--workers", type=int, default=4) |
| 44 | + parser.add_argument( |
| 45 | + "--backend", |
| 46 | + choices=["processes", "threads", "loky", "dask", "none"], |
| 47 | + default="processes", |
| 48 | + ) |
| 49 | + parser.add_argument("--entries", type=int, default=30) |
| 50 | + parser.add_argument( |
| 51 | + "--dir", |
| 52 | + type=Path, |
| 53 | + default=Path(__file__).with_name("pending_status_demo"), |
| 54 | + help="Directory to store the demo task file.", |
| 55 | + ) |
| 56 | + args = parser.parse_args() |
| 57 | + |
| 58 | + demo_dir = args.dir.resolve() |
| 59 | + demo_dir.mkdir(parents=True, exist_ok=True) |
| 60 | + |
| 61 | + for path in demo_dir.glob("out_*.txt"): |
| 62 | + path.unlink() |
| 63 | + |
| 64 | + _write_tasks(demo_dir, args.n_tasks, args.sleep, args.jitter) |
| 65 | + |
| 66 | + cmd = [ |
| 67 | + sys.executable, |
| 68 | + "-m", |
| 69 | + "pytask", |
| 70 | + demo_dir.as_posix(), |
| 71 | + "--n-workers", |
| 72 | + str(args.workers), |
| 73 | + "--parallel-backend", |
| 74 | + args.backend, |
| 75 | + "--n-entries-in-table", |
| 76 | + str(args.entries), |
| 77 | + ] |
| 78 | + print("Running:", " ".join(cmd)) |
| 79 | + env = dict(os.environ) |
| 80 | + env.setdefault("PYTHONIOENCODING", "utf-8") |
| 81 | + env.setdefault("PYTHONUTF8", "1") |
| 82 | + return subprocess.call(cmd, env=env) |
| 83 | + |
| 84 | + |
| 85 | +if __name__ == "__main__": |
| 86 | + raise SystemExit(main()) |
0 commit comments