B1: aorta run - universal workload runner#156
Conversation
50b0619 to
0036dea
Compare
There was a problem hiding this comment.
Pull request overview
Adds the new aorta run “universal workload runner” (library API + CLI) that discovers workloads via the aorta.workloads entry-point group, runs N trials, captures per-trial metadata/results, and persists JSON outputs (rank-aware for distributed runs).
Changes:
- Introduces
aorta.runpackage: workload discovery, launch-mode validation, dispatcher orchestration, andTrialResultschema/serialization. - Replaces the
aorta runCLI stub with a real Click command that builds aRunRequestand invokesrun_trials(). - Adds a comprehensive
tests/run/suite covering CLI parsing, discovery, validation, dispatcher behavior, stubs, and end-to-end flows.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/aorta/cli/run.py | Implements the aorta run CLI wrapper, argument parsing, and user-facing reporting. |
| src/aorta/run/init.py | Exposes the public aorta.run API (RunRequest, TrialResult, run_trials). |
| src/aorta/run/_stubs.py | Temporary A1/B3 stubs to unblock B1 while dependencies land. |
| src/aorta/run/collectors.py | Defines reserved/known collector recipe names for validation. |
| src/aorta/run/discovery.py | Entry-point workload discovery and lookup helper. |
| src/aorta/run/dispatcher.py | Core orchestration loop: resolve env/mitigations, run trials, persist results. |
| src/aorta/run/results.py | TrialResult dataclass and JSON-compatible (de)serialization. |
| src/aorta/run/validation.py | Launch-mode validation against WORLD_SIZE and workload declarations. |
| tests/run/init.py | Test package marker for tests/run. |
| tests/run/test_cli_parsing.py | Validates CLI parsing/validation behavior. |
| tests/run/test_discovery.py | Tests workload discovery + lookup behavior. |
| tests/run/test_dispatcher.py | Tests dispatcher lifecycle, env handling, persistence, error resilience. |
| tests/run/test_integration.py | End-to-end tests for CLI + dispatcher + JSON output. |
| tests/run/test_results.py | Tests TrialResult round-trip and schema defaults. |
| tests/run/test_stubs.py | Tests stubbed A1/B3 behavior used by B1. |
| tests/run/test_validation.py | Tests launch-mode validation behavior and error messages. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| start_time = time.time() | ||
|
|
| # 5. Create results directory | ||
| results_dir = request.results_dir / request.workload | ||
| results_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| # 6. Determine if we should write (rank 0 only for distributed) | ||
| rank = int(os.environ.get("RANK", "0")) | ||
| should_write = rank == 0 |
| # Handle both Python 3.10+ (select) and 3.9 (get) APIs | ||
| if hasattr(eps, "select"): | ||
| group = eps.select(group="aorta.workloads") | ||
| else: | ||
| group = eps.get("aorta.workloads", []) |
| class TestPython39Compatibility: | ||
| """Tests for Python 3.9 API compatibility.""" | ||
|
|
||
| def test_uses_get_api_when_select_not_available(self): | ||
| """Falls back to .get() API for Python 3.9.""" | ||
| mock_ep = MagicMock() | ||
| mock_ep.name = "fallback_workload" | ||
| mock_ep.load.return_value = MockWorkload | ||
|
|
||
| # Mock entry_points without select method (Python 3.9 API) | ||
| mock_eps = {"aorta.workloads": [mock_ep]} | ||
|
|
||
| with patch("importlib.metadata.entry_points", return_value=mock_eps): | ||
| workloads = discover_workloads() | ||
|
|
||
| assert "fallback_workload" in workloads |
| workload_cls = get_workload_class(request.workload) | ||
|
|
||
| # 2. Validate launch mode BEFORE setup() | ||
| validate_launch_mode(workload_cls) | ||
|
|
| RuntimeError: Workload 'FsdpWorkload' requires WORLD_SIZE >= 2 | ||
| (got 1); launch with: torchrun --nproc_per_node=2 -m aorta run ... | ||
| """ | ||
| world_size = int(os.environ.get("WORLD_SIZE", "1")) |
| workload = workload_cls(config=config) | ||
| workload.setup() | ||
| workload_result = workload.run() | ||
| workload.cleanup() |
| workload_result: WorkloadResult | ||
|
|
||
| try: | ||
| workload = workload_cls(config=config) |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| collect: tuple[str, ...] = field(default_factory=tuple) | ||
|
|
||
|
|
||
| def run_trials(request: RunRequest) -> list[TrialResult]: |
| # rank causes shared-FS contention and weakens the rank-0-only | ||
| # write guarantee. | ||
| rank = int(os.environ.get("RANK", "0")) | ||
| should_write = rank == 0 | ||
| results_dir = request.results_dir / request.workload |
| for ep in group: | ||
| try: | ||
| cls = ep.load() | ||
| workloads[ep.name] = cls | ||
| except Exception as e: |
| except Exception as e: | ||
| # Log but don't crash - allow other workloads to load | ||
| print(f"Warning: Failed to load workload '{ep.name}': {e}") | ||
|
|
| trial_id: str | ||
| workload: str | ||
| execution_env: dict[str, Any] | ||
| mitigations_applied: tuple[str, ...] | ||
| config: dict[str, Any] | ||
| env: dict[str, Any] | ||
| result: dict[str, Any] | ||
| wall_clock_sec: float |
| if "=" not in pair: | ||
| raise click.ClickException( | ||
| f"Invalid extra-env format: '{pair}'. Expected KEY=VALUE." | ||
| ) | ||
| k, v = pair.split("=", 1) | ||
| extra_env_dict[k.strip()] = v.strip() | ||
|
|
Implements GitHub issue #148 in two phases: - B1.0: Interface stub (types, contracts, signatures) - B1.1: Full implementation (dispatcher, validation, JSON persistence) ## Core Features - Library-first API: run_trials(RunRequest) -> list[TrialResult] - CLI wrapper: aorta run --workload <name> --trials N [options] - Workload discovery via importlib.metadata entry-points - Launch mode validation (single_process vs distributed) - Mitigation env-var union with proper override order - Rank-aware JSON writes (rank 0 only for distributed workloads) - Error resilience: individual trial failures don't stop the run - Environment restoration after each trial ## Files Created Source (src/aorta/run/): - __init__.py - Public API exports - results.py - TrialResult dataclass with JSON serialization - collectors.py - Collector recipe validation (MVP no-op) - dispatcher.py - Core orchestration with run_trials() - discovery.py - Entry-point workload discovery - validation.py - Launch mode validation - _stubs.py - Temporary stubs for A1/B3 dependencies Tests (tests/run/): - test_results.py (6 tests) - test_cli_parsing.py (14 tests) - test_validation.py (12 tests) - test_discovery.py (9 tests) - test_dispatcher.py (18 tests) - test_integration.py (15 tests) - test_stubs.py (15 tests) Total: 89 tests covering serialization, CLI parsing, validation, discovery, dispatcher logic, and end-to-end scenarios. ## Files Modified - src/aorta/cli/run.py - Replaced stub with full implementation - Added --environment, --collect, --extra-env options - Changed --dockers (plural) to --environment (single) - Thin wrapper (<30 lines) that calls run_trials() ## Dependencies - A1 (env probe): Stubbed temporarily, will use real implementation when PR #152 merges - B3 (registry): Stubbed temporarily, will replace when B3 lands ## Schema TrialResult schema version 0.1 (unstable until external consumers pin): - trial_id, workload, execution_env, mitigations_applied - config, env (EnvSnapshot), result (WorkloadResult) - wall_clock_sec, exit_status (ok/workload_failed/infrastructure_failed/timeout) ## Verification Run tests: pytest tests/run/ -v Manual test: aorta run --workload fsdp --trials 1 --steps 10 ## Follow-up Work When PR #152 merges: Replace _stubs.collect_env with real A1 When B3 lands: Replace _stubs.get_environment/get_mitigation with B3 registry Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
* dispatcher: switch trial timing to ``time.perf_counter`` -- monotonic so ``wall_clock_sec`` is robust against system-clock jumps. * dispatcher: validate ``request.collect`` against KNOWN_RECIPES at the library entry point, mirroring the CLI's check; B2 and other programmatic callers now get the same behaviour. * dispatcher: only rank 0 creates ``results_dir`` (avoids shared-FS contention and tightens the rank-0-only write guarantee). * dispatcher: move ``workload.cleanup()`` into the ``finally`` block with a workload-exists guard, so setup()/run() failures no longer leak resources; cleanup errors are swallowed so they cannot mask the original exception/exit_status. * dispatcher: construct workloads positionally as ``workload_cls(config)`` to match the documented Workload(config) contract -- third-party plugins are free to name their first parameter anything. * validation: catch a non-int ``WORLD_SIZE`` and re-raise a clean ``RuntimeError`` instead of leaking ``ValueError`` through the early-validation path. * discovery: drop the ``EntryPoints.get`` fallback; ``requires-python`` is >=3.10, so ``select`` is always available, and the test for the fallback is removed. All 88 (formerly 89) tests in tests/run/ pass. Co-authored-by: Cursor <cursoragent@cursor.com>
f6f71dd to
f7e4b53
Compare
* dispatcher: reject ``request.trials < 1`` up-front instead of silently running zero trials. * dispatcher: parse ``RANK`` defensively -- a misconfigured launcher that sets ``RANK`` to a non-integer no longer crashes the run; we fall back to rank 0 with a logged warning. * discovery: switch the load-failure warning from ``print()`` to a module logger so library callers can control verbosity / capture. * discovery: validate that each entry-point resolves to a class that is a ``Workload`` subclass; mis-registered plugins (functions, unrelated classes, instances) are now skipped with a clear warning instead of being returned and failing later inside the dispatcher. * results: ``TrialResult`` is ``frozen=True`` but its mutable dict fields could still be mutated through the original references. Deep-copy ``execution_env`` / ``config`` / ``env`` / ``result`` in ``__post_init__`` and again in ``to_dict()`` so the dataclass is effectively immutable from construction time. * cli: ``--extra-env`` now rejects empty keys (``=value``) and keys that don't match the env-var name pattern, with a clear ``ClickException`` at parse time instead of a deep ``ValueError`` later when ``os.environ.update`` is called. * tests: cover all six new behaviours (95 passing, was 88). Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
src/aorta/cli/run.py:50
--mitigations-file/mitigation_filesis parsed but never used. Either plumb these paths through to the registry loaders (so ad-hoc sidecars actually affect environment/mitigation resolution) or remove the option until it’s implemented; as-is it’s misleading and the help text claims consumption will happen in this task.
@click.option(
"--mitigations-file",
"mitigation_files",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
multiple=True,
help=(
"JSON file with ad-hoc mitigations and/or environments (repeatable). "
"Click verifies the file exists; JSON parsing + consumption land with task B1."
),
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger = logging.getLogger(__name__) | ||
|
|
||
| from aorta.workloads import Workload, WorkloadResult | ||
| from aorta.run.collectors import KNOWN_RECIPES | ||
| from aorta.run.discovery import get_workload_class | ||
| from aorta.run.validation import validate_launch_mode | ||
| from aorta.run.results import TrialResult |
|
|
||
| # Import from stubs (replace when A1/B3 land) | ||
| from aorta.run._stubs import ( | ||
| collect_env, | ||
| get_environment, | ||
| get_mitigation, | ||
| Environment, | ||
| ) |
| # Capture environment snapshot (using stub, replace with A1 when available) | ||
| env_snapshot = collect_env() | ||
|
|
||
| # Build config | ||
| config: dict[str, Any] = {**request.config_overrides} | ||
| if request.steps is not None: | ||
| config["steps"] = request.steps | ||
|
|
||
| # Save original environment for restoration | ||
| original_env = dict(os.environ) | ||
|
|
||
| # Apply mitigation env + extra_env | ||
| os.environ.update(mitigation_env) | ||
| os.environ.update(request.extra_env) | ||
|
|
| @click.option( | ||
| "--results-dir", | ||
| type=click.Path(file_okay=False, writable=True), | ||
| default="results", | ||
| show_default=True, | ||
| help="Directory to write per-trial JSON.", | ||
| ) |
| """Temporary stubs for B3 registry and A1 environment probe. | ||
|
|
||
| Remove this file when B3 lands: src/aorta/registry/ with get_mitigation() | ||
| and get_environment(). | ||
|
|
||
| When A1 lands (PR #152): Replace collect_env and EnvSnapshot imports with: | ||
| from aorta.instrumentation.environment import collect_env, EnvSnapshot | ||
|
|
||
| Note: These stubs provide minimal implementations that allow B1 to function | ||
| while dependencies are being developed. They should be replaced with real | ||
| implementations as soon as those dependencies are available. | ||
| """ |
* dispatcher: drop the temporary ``aorta.run._stubs`` shim and wire
directly to the real platform APIs:
- ``collect_env`` from ``aorta.instrumentation.environment`` (A1,
PR #152).
- ``Environment`` / ``get_environment`` / ``get_mitigation`` from
``aorta.registry`` (B3 + B3.1, PR #159).
Knock-on cleanup:
- ``get_mitigation`` now returns a defensive ``dict[str, str]``
directly, so the union loop is one line shorter.
- ``execution_env`` mirrors the real ``Environment`` schema --
``kind`` / ``rocm`` / ``digest`` were stub-isms; ROCm version now
lives inside ``env_snapshot.rocm`` and the runtime kind in
``env_snapshot.runtime_context.type`` from A1.
- ``aorta.registry.UnknownEnvironmentError`` /
``UnknownMitigationError`` are ``KeyError`` subclasses, so the
CLI catches both ``ValueError`` and ``LookupError``.
* dispatcher: capture ``collect_env()`` AFTER applying mitigation +
extra-env so the persisted snapshot reflects the actual environment
the workload ran under (e.g. ``DISABLE_TF32=1`` from ``tf32_off``).
* dispatcher: move ``logger = logging.getLogger(__name__)`` below the
imports so the import block is contiguous (PEP 8 / ruff E402).
* cli/run.py: drop ``click.Path(..., writable=True)`` on
``--results-dir`` -- Click's writable check rejects paths that
don't already exist, breaking the default ``results`` directory on
a fresh checkout. Switch to ``path_type=Path`` and let the
dispatcher's ``mkdir`` surface I/O errors.
* tests:
- delete ``tests/run/test_stubs.py`` (covered the now-deleted
stub module).
- ``test_integration.py``: assert against the real ``Environment``
/ ``EnvSnapshot`` shapes; adjust the unknown-env / unknown-mit
cases to expect the registry's ``UnknownEnvironmentError`` /
``UnknownMitigationError``.
- new ``test_env_snapshot_reflects_mitigation_env`` pins the
ordering fix.
- new ``test_default_results_dir_does_not_require_existing_path``
pins the ``--results-dir`` Click validation fix.
80 passing in ``tests/run/``.
Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 9 comments.
Comments suppressed due to low confidence (1)
src/aorta/cli/run.py:46
--mitigations-file/mitigation_filesis accepted by the CLI but never used when building theRunRequest, so users can pass this flag and it will be silently ignored. Either implement consumption of these JSON files (e.g., load sidecars into the registry) or remove/disable the flag until it is supported.
"--mitigations-file",
"mitigation_files",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
multiple=True,
help=(
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import os | ||
| import pytest | ||
| from pathlib import Path |
|
|
||
| from aorta.cli.run import run as run_cmd | ||
| from aorta.run.dispatcher import RunRequest, run_trials | ||
| from aorta.run.results import TrialResult |
| from aorta.run.dispatcher import RunRequest, run_trials, _run_single_trial | ||
| from aorta.run.results import TrialResult |
| # Build config overrides | ||
| config_overrides: dict = {} | ||
| if steps is not None: | ||
| config_overrides["steps"] = steps |
|
|
||
|
|
| try: | ||
| workload.cleanup() | ||
| except Exception: | ||
| pass |
| schema_version: Version of the result schema (for future migration). | ||
| trial_id: Unique identifier for this trial (e.g., "fsdp_t0"). | ||
| workload: Name of the workload that was executed. | ||
| execution_env: Environment descriptor as dict (kind, name, image, etc.). |
| workloads = discover_workloads() | ||
| assert isinstance(workloads, dict) | ||
|
|
||
| def test_registered_workloads_are_found(self): | ||
| """Entry-point registered workloads are discovered.""" | ||
| # Note: fsdp is registered in pyproject.toml but the class | ||
| # doesn't exist yet. This test verifies discovery attempts to load. | ||
| workloads = discover_workloads() | ||
| # The dict exists even if loading fails |
| workloads = discover_workloads() | ||
| assert isinstance(workloads, dict) | ||
|
|
||
| def test_registered_workloads_are_found(self): | ||
| """Entry-point registered workloads are discovered.""" | ||
| # Note: fsdp is registered in pyproject.toml but the class | ||
| # doesn't exist yet. This test verifies discovery attempts to load. | ||
| workloads = discover_workloads() | ||
| # The dict exists even if loading fails | ||
| assert isinstance(workloads, dict) |
Three rounds of review on this PR fell into a small set of recurring
patterns that I had been fixing one comment at a time. This commit
runs the audits up-front so the next review round doesn't repeat them.
Lint / style (ruff E,F,W,I,N,UP,B,C4 -- repo's pyproject config):
* ``ruff check --fix`` resolves 18 mechanical findings -- unused
imports (F401), unsorted imports (I001), ``Type[X] -> type[X]``
(UP006/UP035) on the project's ``requires-python = ">=3.10"`` floor.
* Manual fixes for the remaining 5:
- ``raise ClickException(...) from e`` in cli/run.py (B904) so
the ClickException doesn't shadow the underlying registry /
dispatcher exception in tracebacks.
- lift ``_ENV_KEY_RE`` to module scope in cli/run.py (N806) so
the regex compiles once and the function-local name doesn't
trip the upper-case-in-function rule.
- ``pytest.raises(FrozenInstanceError)`` (was the much-too-broad
``Exception``) for both frozen-dataclass tests (B017).
* ``ruff format`` on the touched files -- previously 4 of them would
reformat under the project's line-length=100 config.
Library/CLI parity (same class as the round-1 ``request.collect`` and
round-2 ``extra_env`` comments -- generalised this round):
* ``run_trials`` now validates ``extra_env`` keys against the same
POSIX env-var name regex the CLI applies at parse time. Library
callers (B2 triage matrix, future programmatic users) bypass the
CLI parser; without this they would only fail mid-trial inside
``os.environ.update`` with the unfriendly stdlib message.
Migration debt (same class as the round-3 ``_stubs.py`` comment):
* ``--mitigations-file`` was being parsed and dropped on the floor.
The help text still claimed "consumption land with task B1" -- and
this is task B1. Now that B3.1 has landed, the registry's
``get_mitigation`` / ``get_environment`` accept ``extra_files``.
Wire it through:
- ``RunRequest`` gets a ``sidecar_files: tuple[Path, ...]`` field.
- the CLI passes ``mitigation_files`` into it.
- the dispatcher forwards to both registry calls so that sidecar
mitigations and environments resolve in the same lookup as
built-ins and entry-point plugins.
- the CLI's exception bridge now also catches ``RegistryError``
(malformed sidecar / collisions) so those surface as a clean
``ClickException`` instead of a Python traceback.
Resource lifecycle (same class as the round-1 ``cleanup() in finally``
comment):
* ``workload.cleanup()`` failures are now logged via
``logger.warning(..., exc_info=True)`` instead of silently
swallowed. Silent swallowing made leaked GPU memory / process
groups / file handles invisible to the operator -- the original
trial outcome is still preserved (cleanup failures must not mask
the original exception/exit_status).
Tests pinning the new contracts:
* ``test_rejects_invalid_extra_env_keys`` -- empty / numeric-leading
/ whitespace / ``=foo`` keys all raise ``ValueError`` from
``run_trials`` (not from ``os.environ.update`` mid-trial).
* ``test_cleanup_error_is_logged_not_swallowed`` -- a ``cleanup()``
that raises is logged with type + trial_id while leaving
``exit_status == "ok"``.
* ``test_sidecar_files_are_forwarded_to_registry`` -- writes a real
B3.1 sidecar JSON declaring a custom mitigation, drives a trial
through it, and asserts the env var actually reaches the workload's
``setup()``. Regression guard for the "parsed and dropped" bug.
83 passing in tests/run/ (was 80). ``ruff check`` clean. ``ruff
format --check`` clean. ``mypy`` clean across the touched src files.
Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| raw_world_size = os.environ.get("WORLD_SIZE", "1") | ||
| try: | ||
| world_size = int(raw_world_size) | ||
| except ValueError as e: | ||
| raise RuntimeError( | ||
| f"Invalid WORLD_SIZE={raw_world_size!r}: expected an integer " | ||
| "(launchers should set WORLD_SIZE to the rank count)." | ||
| ) from e | ||
| launch_mode = workload_cls.launch_mode | ||
| min_world_size = workload_cls.min_world_size | ||
|
|
||
| if launch_mode == "single_process" and world_size > 1: | ||
| raise RuntimeError( |
| # Restore original environment | ||
| os.environ.clear() | ||
| os.environ.update(original_env) |
| schema_version: Version of the result schema (for future migration). | ||
| trial_id: Unique identifier for this trial (e.g., "fsdp_t0"). | ||
| workload: Name of the workload that was executed. | ||
| execution_env: Environment descriptor as dict (kind, name, image, etc.). |
| # Log but don't crash - allow other workloads to load. Use | ||
| # a logger (not print) so library callers can control | ||
| # verbosity and filter/redirect normally. | ||
| logger.warning("Failed to load workload '%s': %s", ep.name, e) |
Triages the two latest review rounds against the current branch and folds in every distinct comment. After the previous "proactive audit" push, the reviewer flagged six new issues; four are exact replays of classes I'd fixed elsewhere, so the fix here is to apply the same treatment uniformly. Code: * cli/run.py: stop also writing ``steps`` into ``config_overrides``. ``RunRequest.steps`` is the canonical home; carrying the same value through two paths is ambiguous if a future caller writes only one. * run/dispatcher.py: ``RunRequest.__post_init__`` deep-copies ``extra_env`` and ``config_overrides``. ``frozen=True`` only blocks attribute reassignment; the dicts were still mutable through the caller's reference, mirroring a class of bug already fixed for ``TrialResult`` in round 2. Generalizing now. * run/dispatcher.py: replace ``os.environ.clear() + os.environ.update(snapshot)`` with a diff-based restore. ``run_trials`` is a public library API, so the global wipe-and- repopulate had a window during which other threads in the process saw an empty environment. The diff approach transitions each key at most once, directly to its target value, and still cleans up workload-side mutations. * run/discovery.py: ``logger.warning(..., exc_info=True)`` for failed entry-point loads. Without ``exc_info`` the most common failure mode (an ImportError chain inside the plugin module) was effectively undiagnosable. * run/validation.py: reject ``WORLD_SIZE < 1`` up-front. The previous branches only checked ``> 1`` / ``< min``, so ``WORLD_SIZE=0`` and negatives fell into the ``single_process`` happy path -- same family as the round-2 ``trials >= 1`` and defensive ``RANK`` parsing fixes. * run/results.py: update the ``TrialResult`` docstring to describe the ``execution_env`` shape that's actually emitted (mirroring ``aorta.registry.Environment``: ``name / docker / venv / source_package``) and clarify that ``rocm`` / runtime kind / docker digest live inside ``env`` (the runtime ``EnvSnapshot``), not inside ``execution_env`` (the static descriptor). Tests (4 new, 2 hardened): * test_validation.py: ``test_world_size_zero_raises`` and ``test_world_size_negative_raises`` pin the new ``< 1`` rejection. * test_dispatcher.py: ``test_mutable_fields_are_defensively_copied`` pins ``RunRequest`` defensive deep-copy, including nested-dict aliasing. * test_dispatcher.py: ``test_environment_restore_does_not_use_global_clear`` runs a real trial with a tracking shim around ``os.environ.clear`` and asserts the dispatcher never invokes it. Pins the thread-safety property. * test_discovery.py: hardened ``test_handles_load_failure_gracefully`` to assert ``exc_info`` is attached and points at the original exception type, not just that *some* warning was emitted. * test_discovery.py: patched ``importlib.metadata.entry_points`` in ``test_returns_dict`` and ``test_registered_workloads_are_found`` so they no longer depend on whatever plugins happen to be installed in the ambient environment. Verification: * ruff check / ruff format / mypy: clean across all touched files. * pytest tests/run/: 87 passing (was 83; +4 new tests). Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if launch_mode == "single_process" and world_size > 1: | ||
| raise RuntimeError( | ||
| f"Workload '{workload_cls.__name__}' is single_process; " | ||
| f"do not wrap with torchrun (WORLD_SIZE={world_size})" | ||
| ) | ||
|
|
||
| if launch_mode == "distributed" and world_size < min_world_size: | ||
| raise RuntimeError( | ||
| f"Workload '{workload_cls.__name__}' requires WORLD_SIZE >= {min_world_size} " | ||
| f"(got {world_size}); launch with: " | ||
| f"torchrun --nproc_per_node={min_world_size} -m aorta run ..." | ||
| ) | ||
|
|
||
|
|
| execution_env: Environment descriptor as dict. Mirrors the | ||
| :class:`aorta.registry.Environment` shape: | ||
| ``{"name": str, "docker": str | None, "venv": str | None, | ||
| "source_package": str}``. ROCm version, runtime kind, and | ||
| container image digest are NOT part of this block -- they |
| result = TrialResult( | ||
| trial_id="test_0", | ||
| workload="fsdp", | ||
| execution_env={"kind": "local", "name": "local"}, | ||
| mitigations_applied=("none",), |
| with open(output_path, "w") as f: | ||
| json.dump(trial_result.to_dict(), f, indent=2) |
Implements #148
Summary
aorta runis the universal workload runner that executes trials across workloads, environments, and mitigations. This PR delivers both B1.0 (interface stub) and B1.1 (full implementation) as specified in the issue.Core Features
✅ Library-first API:
run_trials(RunRequest) -> list[TrialResult]for in-process use by B2 (triage matrix runner)✅ CLI Interface:
aorta run --workload <name> --trials N [options]✅ Workload Discovery: Entry-point based plugin system (
aorta.workloadsgroup)✅ Launch Mode Validation: Prevents single_process workloads under torchrun and vice versa
✅ Mitigation System: Env-var union with proper override order (
mitigations→extra_env)✅ Rank-Aware Persistence: Only rank 0 writes JSON files for distributed workloads
✅ Error Resilience: Individual trial failures don't stop the run
✅ Environment Restoration: Original env restored after each trial
Implementation Details
Files Created
Source (
src/aorta/run/):__init__.py- Public API exports (RunRequest,TrialResult,run_trials)results.py- TrialResult dataclass with JSON serialization (schema v0.1)collectors.py- Collector recipe validation (MVP: validated but no-op)dispatcher.py- Core orchestration logicdiscovery.py- Entry-point workload discovery viaimportlib.metadatavalidation.py- Launch mode validation (single_process vs distributed)_stubs.py- Temporary stubs for A1/B3 dependenciesTests (
tests/run/):test_results.py- 6 tests for TrialResult serializationtest_cli_parsing.py- 14 tests for CLI argument parsingtest_validation.py- 12 tests for launch mode validationtest_discovery.py- 9 tests for workload entry-point discoverytest_dispatcher.py- 18 tests for dispatcher logictest_integration.py- 15 tests for end-to-end scenariostest_stubs.py- 15 tests for stub implementationsTotal: 89 tests covering all aspects of the implementation.
Files Modified
src/aorta/cli/run.py- Replaced stub with full implementation--environment,--collect,--extra-envoptions--dockers(plural) to--environment(single)run_trials()TrialResult Schema
Schema version
0.1(unstable until external consumers pin):{ "schema_version": "0.1", "trial_id": "fsdp_t0", "workload": "fsdp", "execution_env": { "kind": "local", "name": "local", "image": null, "digest": null, "venv": null, "rocm": null, "source_package": "aorta" }, "mitigations_applied": ["none"], "config": {"steps": 10}, "env": { /* EnvSnapshot dict */ }, "result": { /* WorkloadResult dict */ }, "wall_clock_sec": 12.5, "exit_status": "ok" }Exit status values:
ok,workload_failed,infrastructure_failed,timeoutDependencies
A1 (Environment Probe) - Stubbed
_stubs.pyB3 (Registry) - Stubbed
_stubs.py(supportslocalenvironment,none/tf32_offmitigations)Usage Examples
Testing
Run Unit Tests
Manual Verification
Acceptance Criteria
Per issue #148, all B1.0 and B1.1 criteria are met:
B1.0 (Interface Stub)
from aorta.run.dispatcher import run_trials, RunRequestimports cleanlyfrom aorta.run.results import TrialResultimports cleanlyaorta run --workload fsdp --collect rocprof --trials 1parses arguments--collect bogusrejects with clear error listing valid namesto_dict()/from_dict()losslesslyB1.1 (Full Implementation)
aorta run --workload fsdp --trials 2writestrial_0.jsonandtrial_1.json--mitigations tf32_offappliesDISABLE_TF32=1env var--environmentflag (no--dockersplural)extra_envoverrides mitigation env vars--collectvalidates names, silently no-opsexecution_envblock populated in JSONrun_trials()setup()Next Steps
run_trials()Notes
🤖 Generated with Claude Code