Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cloudai/configurator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,12 +16,13 @@

from .base_agent import BaseAgent
from .base_gym import BaseGym
from .cloudai_gym import CloudAIGymEnv
from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry
from .grid_search import GridSearchAgent

__all__ = [
"BaseAgent",
"BaseGym",
"CloudAIGymEnv",
"GridSearchAgent",
"TrajectoryEntry",
]
96 changes: 78 additions & 18 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import copy
import csv
import dataclasses
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

from cloudai.core import METRIC_ERROR, BaseRunner, Registry, TestRun
Expand All @@ -25,6 +27,16 @@
from .base_gym import BaseGym


@dataclasses.dataclass(frozen=True)
class TrajectoryEntry:
"""Represents a trajectory entry."""

step: int
action: dict[str, Any]
reward: float
observation: list

Comment thread
podkidyshev marked this conversation as resolved.

class CloudAIGymEnv(BaseGym):
"""
Custom Gym environment for CloudAI integration.
Expand All @@ -45,6 +57,7 @@ def __init__(self, test_run: TestRun, runner: BaseRunner):
self.runner = runner
self.max_steps = test_run.test.agent_steps
self.reward_function = Registry().get_reward_function(test_run.test.agent_reward_function)
self.trajectory: dict[int, list[TrajectoryEntry]] = {}
super().__init__()

def define_action_space(self) -> Dict[str, list[Any]]:
Expand Down Expand Up @@ -104,6 +117,14 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
"""
self.test_run = self.test_run.apply_params_set(action)

cached_result = self.get_cached_trajectory_result(action)
if cached_result is not None:
logging.info(
"Retrieved cached result from trajectory with reward %s. Skipping step.",
cached_result.reward,
)
return cached_result.observation, cached_result.reward, False, {}
Comment thread
podkidyshev marked this conversation as resolved.

if not self.test_run.test.constraint_check(self.test_run, self.runner.system):
logging.info("Constraint check failed. Skipping step.")
return [-1.0], -1.0, True, {}
Expand Down Expand Up @@ -131,7 +152,14 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
observation = self.get_observation(action)
reward = self.compute_reward(observation)

self.write_trajectory(self.test_run.step, action, reward, observation)
self.write_trajectory(
TrajectoryEntry(
step=self.test_run.step,
action=action,
reward=reward,
observation=observation,
)
)

return observation, reward, False, {}

Expand Down Expand Up @@ -188,25 +216,57 @@ def get_observation(self, action: Any) -> list:
observation.append(v)
return observation

def write_trajectory(self, step: int, action: Any, reward: float, observation: list):
"""
Write the trajectory to a CSV file.

Args:
step (int): The current step number.
action (Any): The action taken by the agent.
reward (float): The reward received for the action.
observation (list): The observation after taking the action.
"""
trajectory_file_path = (
self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" / "trajectory.csv"
)
def write_trajectory(self, entry: TrajectoryEntry):
"""Append the trajectory to the CSV file and to the local attribute."""
self.current_trajectory.append(entry)

file_exists = trajectory_file_path.exists()
logging.debug(f"Writing trajectory into {trajectory_file_path} (exists: {file_exists})")
file_exists = self.trajectory_file_path.exists()
logging.debug(f"Writing trajectory into {self.trajectory_file_path} (exists: {file_exists})")
self.trajectory_file_path.parent.mkdir(parents=True, exist_ok=True)

with open(trajectory_file_path, mode="a", newline="") as file:
with open(self.trajectory_file_path, mode="a", newline="") as file:
writer = csv.writer(file)
if not file_exists:
writer.writerow(["step", "action", "reward", "observation"])
writer.writerow([step, action, reward, observation])
writer.writerow([entry.step, entry.action, entry.reward, entry.observation])

@property
def trajectory_file_path(self) -> Path:
return self.runner.scenario_root / self.test_run.name / f"{self.test_run.current_iteration}" / "trajectory.csv"

@property
def current_trajectory(self) -> list[TrajectoryEntry]:
return self.trajectory.setdefault(self.test_run.current_iteration, [])

def get_cached_trajectory_result(self, action: Any) -> TrajectoryEntry | None:
for entry in self.current_trajectory:
if self._values_match_exact(entry.action, action):
return entry

return None

@classmethod
def _values_match_exact(cls, left: Any, right: Any) -> bool:
if type(left) is not type(right):
return False

elif isinstance(left, dict):
left_keys = set(left.keys())
right_keys = set(right.keys())
if left_keys != right_keys:
return False

return all(cls._values_match_exact(left[key], right[key]) for key in left_keys)

elif isinstance(left, (list, tuple)):
if len(left) != len(right):
return False

for left_item, right_item in zip(left, right, strict=True):
if not cls._values_match_exact(left_item, right_item):
return False

return True

else:
return left == right
Comment thread
podkidyshev marked this conversation as resolved.
11 changes: 9 additions & 2 deletions src/cloudai/systems/slurm/single_sbatch_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pathlib import Path
from typing import Generator, Optional, cast

from cloudai.configurator.cloudai_gym import CloudAIGymEnv
from cloudai.configurator import CloudAIGymEnv, TrajectoryEntry
from cloudai.core import JobIdRetrievalError, System, TestRun, TestScenario
from cloudai.util import CommandShell, format_time_limit, parse_time_limit

Expand Down Expand Up @@ -212,7 +212,14 @@ def handle_dse(self):
gym = CloudAIGymEnv(next_tr, self)
observation = gym.get_observation({})
reward = gym.compute_reward(observation)
gym.write_trajectory(idx, combination, reward, observation)
gym.write_trajectory(
TrajectoryEntry(
step=idx,
action=combination,
reward=reward,
observation=observation,
)
)
Comment thread
podkidyshev marked this conversation as resolved.

def _submit_test(self, tr: TestRun) -> SlurmJob:
with open(self.scenario_root / "cloudai_sbatch_script.sh", "w") as f:
Expand Down
50 changes: 49 additions & 1 deletion tests/test_cloudaigym.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import pytest

from cloudai.configurator import CloudAIGymEnv, GridSearchAgent
from cloudai.configurator import CloudAIGymEnv, GridSearchAgent, TrajectoryEntry
from cloudai.core import BaseRunner, Runner, TestRun, TestScenario
from cloudai.systems.slurm import SlurmSystem
from cloudai.util import flatten_dict
Expand Down Expand Up @@ -298,3 +298,51 @@ def test_apply_params_set__preserves_installables_state(setup_env: tuple[TestRun
upd_tdef = cast(NIXLBenchTestDefinition, new_tr.test)

assert upd_tdef.docker_image.installed_path == tmp_path


@pytest.mark.parametrize(
("trajectory", "current_iteration", "action", "expected_step"),
[
({}, 0, {"x": 1}, None),
({0: [TrajectoryEntry(1, {"x": 1}, 1, [1])]}, 0, {"x": 1}, 1),
({0: [TrajectoryEntry(1, {"x": 1.0}, 1, [1])]}, 0, {"x": 1}, None),
(
{
0: [
TrajectoryEntry(1, {"x": 1.0}, 1, [1]),
TrajectoryEntry(2, {"x": 1}, 1, [1]),
]
},
0,
{"x": 1},
2,
),
({0: [TrajectoryEntry(1, {"x": 1}, 1, [1])]}, 1, {"x": 1}, None),
({1: [TrajectoryEntry(3, {"x": 1}, 1, [1])]}, 1, {"x": 1}, 3),
],
Comment thread
podkidyshev marked this conversation as resolved.
)
def test_get_cached_trajectory_result(
base_tr: TestRun,
tmp_path: Path,
trajectory: dict[int, list[TrajectoryEntry]],
current_iteration: int,
action: dict[str, object],
expected_step: int | None,
) -> None:
runner = MagicMock()
runner.scenario_root = tmp_path / "scenario"
runner.system = MagicMock()
runner.test_scenario = MagicMock(test_runs=[])
runner.jobs = {}
runner.testrun_to_job_map = {}
runner.get_job_output_path.return_value = tmp_path / "scenario" / base_tr.name / "0" / "7"

env = CloudAIGymEnv(test_run=base_tr, runner=runner)
env.test_run.current_iteration = current_iteration
env.trajectory = trajectory

actual = env.get_cached_trajectory_result(action)
if actual is None:
assert expected_step is None
else:
assert actual.step == expected_step
70 changes: 69 additions & 1 deletion tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@

import argparse
from typing import Any, ClassVar, Iterator
from unittest.mock import MagicMock

import pandas as pd
import pytest
from pydantic import Field

from cloudai.cli.handlers import handle_dse_job
from cloudai.core import BaseAgent, BaseAgentConfig, Registry, Runner, TestDependency, TestRun, TestScenario
from cloudai.core import (
BaseAgent,
BaseAgentConfig,
Registry,
Runner,
TestDependency,
TestRun,
TestScenario,
)
from cloudai.models.scenario import ReportConfig
from cloudai.reporter import StatusReporter
from cloudai.systems.slurm.slurm_system import SlurmSystem


Expand Down Expand Up @@ -139,3 +151,59 @@ def test_dse_run_uses_agent_config(
assert recorded.knob == expected["knob"]
assert recorded.payload == expected["payload"]
assert recorded.random_seed == expected["random_seed"]


def test_dse_run_cache(base_tr: TestRun, tmp_path, caplog: pytest.LogCaptureFixture):
base_tr.test.cmd_args.candidate = [1, 1, 2]
base_tr.test.agent = "grid_search"
base_tr.test.agent_steps = 3

inner_runner = MagicMock()
inner_runner.system = MagicMock()
inner_runner.scenario_root = tmp_path / "scenario"
inner_runner.test_scenario = TestScenario(name="test_scenario", test_runs=[base_tr])
inner_runner.jobs = {}
inner_runner.testrun_to_job_map = {}

def _job_output_path(tr: TestRun, create: bool = True):
output_path = inner_runner.scenario_root / tr.name / f"{tr.current_iteration}" / f"{tr.step}"
if create:
output_path.mkdir(parents=True, exist_ok=True)
return output_path

inner_runner.get_job_output_path.side_effect = _job_output_path

runner = MagicMock()
runner.runner = inner_runner

trajectory_dir = inner_runner.scenario_root / base_tr.name / f"{base_tr.current_iteration}"

# run test
with caplog.at_level("INFO"):
assert handle_dse_job(runner, argparse.Namespace(mode="dry-run")) == 0

reporter = StatusReporter(
inner_runner.system,
TestScenario(name="test_scenario", test_runs=[base_tr]),
inner_runner.scenario_root,
ReportConfig(),
)
reporter.load_test_runs()

assert inner_runner.run.call_count == 2
assert (trajectory_dir / "1").exists()
assert not (trajectory_dir / "2").exists()
assert (trajectory_dir / "3").exists()
assert caplog.text.count("Retrieved cached result from") == 1

actual_trajectory = pd.read_csv(trajectory_dir / "trajectory.csv")
expected_trajectory = pd.DataFrame(
data=[
[1, "{'candidate': 1}", -1.0, "[-1.0]"],
[3, "{'candidate': 2}", -1.0, "[-1.0]"],
],
columns=["step", "action", "reward", "observation"],
)
pd.testing.assert_frame_equal(actual_trajectory, expected_trajectory)

assert [tr.step for tr in reporter.trs] == [1, 3]
Loading