From cc1eb0afa3ef2f8da9cd14102a148eadda1399d4 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Fri, 15 May 2026 16:57:04 -0400 Subject: [PATCH 1/8] feat(cli): support agents with custom training loops in handle_dse_job - Agents that set HAS_CUSTOM_TRAINING_LOOP = True drive their own training loop; handle_dse_job calls agent.train() and skips the per-step env.step loop. - New _run_custom_training_loop helper logs exceptions, returns a process-style exit code, and always invokes agent.shutdown() (when defined) in a finally block so resources are released on both success and failure paths. - CustomTrainingLoopAgent Protocol documents the opt-in contract for type checkers and IDEs. --- src/cloudai/cli/handlers.py | 40 ++++++++++- tests/test_handlers.py | 130 +++++++++++++++++++++++++++++++++++- 2 files changed, 167 insertions(+), 3 deletions(-) diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index 0284fcd9e..49f750529 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -20,7 +20,7 @@ import signal from contextlib import contextmanager from pathlib import Path -from typing import Callable, List, Optional +from typing import Callable, List, Optional, Protocol, runtime_checkable from unittest.mock import Mock import toml @@ -118,6 +118,40 @@ def prepare_installation( return installables, installer +@runtime_checkable +class CustomTrainingLoopAgent(Protocol): + """ + Agent that drives its own training loop and skips the ``handle_dse_job`` step loop. + + Set ``HAS_CUSTOM_TRAINING_LOOP = True`` on the agent class to opt in. Used by + agents (e.g. RLlib-based) whose training loops are not modelled as a sequence + of independent ``select_action`` / ``env.step`` calls. + """ + + HAS_CUSTOM_TRAINING_LOOP: bool + + def train(self) -> None: ... + + +def _has_custom_training_loop(agent: object) -> bool: + return bool(getattr(agent, "HAS_CUSTOM_TRAINING_LOOP", False)) + + +def _run_custom_training_loop(agent: CustomTrainingLoopAgent, agent_type: str) -> int: + """Drive an agent's self-contained training loop and return a process-style exit code.""" + logging.info(f"Agent {agent_type} drives its own training loop; delegating to agent.train().") + try: + agent.train() + return 0 + except Exception: + logging.exception(f"Custom training loop failed for agent {agent_type}.") + return 1 + finally: + shutdown = getattr(agent, "shutdown", None) + if callable(shutdown): + shutdown() + + def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: registry = Registry() @@ -157,6 +191,10 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: agent = agent_class(env, agent_config) + if _has_custom_training_loop(agent): + err |= _run_custom_training_loop(agent, agent_type) + continue + for step in range(agent.max_steps): result = agent.select_action() if result is None: diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 5124186c0..19e4b0eae 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -15,15 +15,22 @@ # limitations under the License. import argparse +import logging from pathlib import Path -from typing import Any, ClassVar, Iterator +from typing import Any, ClassVar, Iterator, Optional from unittest.mock import MagicMock import pandas as pd import pytest from pydantic import Field -from cloudai.cli.handlers import handle_dse_job, verify_system_configs, verify_test_configs, verify_test_scenarios +from cloudai.cli.handlers import ( + _run_custom_training_loop, + handle_dse_job, + verify_system_configs, + verify_test_configs, + verify_test_scenarios, +) from cloudai.core import ( BaseAgent, BaseAgentConfig, @@ -254,3 +261,122 @@ def test_verify_test_scenarios_logs_failure_details(tmp_path: Path, caplog: pyte assert str(broken_scenario) in caplog.text assert "duplicate TOML key 'name'" in caplog.text assert "1 out of 1 test scenarios have issues." in caplog.text + + +class CustomLoopStubAgentConfig(BaseAgentConfig): + pass + + +class CustomLoopStubAgent(BaseAgent): + """Stub agent that opts into the custom-training-loop dispatch path.""" + + HAS_CUSTOM_TRAINING_LOOP: ClassVar[bool] = True + + train_calls: ClassVar[int] = 0 + shutdown_calls: ClassVar[int] = 0 + train_raises: ClassVar[Optional[BaseException]] = None + + def __init__(self, env, config: CustomLoopStubAgentConfig): + self.env = env + self.config = config + self.max_steps = 0 + + @staticmethod + def get_config_class() -> type[CustomLoopStubAgentConfig]: + return CustomLoopStubAgentConfig + + def configure(self, config: dict[str, Any]) -> None: + raise NotImplementedError + + def select_action(self) -> tuple[int, dict[str, Any]]: # pragma: no cover - never called + raise AssertionError("select_action must not be called when HAS_CUSTOM_TRAINING_LOOP is True") + + def update_policy(self, _feedback: dict[str, Any]) -> None: + return + + def train(self) -> None: + CustomLoopStubAgent.train_calls += 1 + if CustomLoopStubAgent.train_raises is not None: + raise CustomLoopStubAgent.train_raises + + def shutdown(self) -> None: + CustomLoopStubAgent.shutdown_calls += 1 + + +@pytest.fixture +def custom_loop_agent_name() -> Iterator[str]: + registry = Registry() + agent_name = "test_handlers_custom_loop_agent" + old_agent = registry.agents_map.get(agent_name) + registry.update_agent(agent_name, CustomLoopStubAgent) + CustomLoopStubAgent.train_calls = 0 + CustomLoopStubAgent.shutdown_calls = 0 + CustomLoopStubAgent.train_raises = None + yield agent_name + CustomLoopStubAgent.train_calls = 0 + CustomLoopStubAgent.shutdown_calls = 0 + CustomLoopStubAgent.train_raises = None + if old_agent is None: + del registry.agents_map[agent_name] + else: + registry.update_agent(agent_name, old_agent) + + +def test_run_custom_training_loop_calls_train_and_shutdown() -> None: + agent = MagicMock() + agent.train = MagicMock() + agent.shutdown = MagicMock() + + assert _run_custom_training_loop(agent, "mock_agent") == 0 + agent.train.assert_called_once_with() + agent.shutdown.assert_called_once_with() + + +def test_run_custom_training_loop_returns_error_and_still_shuts_down( + caplog: pytest.LogCaptureFixture, +) -> None: + agent = MagicMock() + agent.train = MagicMock(side_effect=RuntimeError("boom")) + agent.shutdown = MagicMock() + + with caplog.at_level(logging.ERROR): + assert _run_custom_training_loop(agent, "mock_agent") == 1 + + agent.shutdown.assert_called_once_with() + assert "boom" in caplog.text + + +def test_run_custom_training_loop_tolerates_missing_shutdown() -> None: + agent = MagicMock(spec=["train"]) + agent.train = MagicMock() + + assert _run_custom_training_loop(agent, "mock_agent") == 0 + agent.train.assert_called_once_with() + + +def test_handle_dse_job_dispatches_to_custom_training_loop( + slurm_system: SlurmSystem, + dse_tr: TestRun, + custom_loop_agent_name: str, +) -> None: + dse_tr.test.agent = custom_loop_agent_name + test_scenario = TestScenario(name="test_scenario", test_runs=[dse_tr]) + runner = Runner(mode="dry-run", system=slurm_system, test_scenario=test_scenario) + + assert handle_dse_job(runner, argparse.Namespace(mode="dry-run")) == 0 + assert CustomLoopStubAgent.train_calls == 1 + assert CustomLoopStubAgent.shutdown_calls == 1 + + +def test_handle_dse_job_propagates_custom_loop_failure( + slurm_system: SlurmSystem, + dse_tr: TestRun, + custom_loop_agent_name: str, +) -> None: + CustomLoopStubAgent.train_raises = RuntimeError("training blew up") + dse_tr.test.agent = custom_loop_agent_name + test_scenario = TestScenario(name="test_scenario", test_runs=[dse_tr]) + runner = Runner(mode="dry-run", system=slurm_system, test_scenario=test_scenario) + + assert handle_dse_job(runner, argparse.Namespace(mode="dry-run")) == 1 + assert CustomLoopStubAgent.shutdown_calls == 1 From ede2ae5a7af2b9a9d4763cbcaa79f56b7afaa343 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Fri, 15 May 2026 18:30:15 -0400 Subject: [PATCH 2/8] fix(cli): narrow agent type via TypeGuard in custom-loop dispatch Pyright rejected calling _run_custom_training_loop(agent, ...) because the plain bool predicate did not narrow agent's static type from BaseAgent to CustomTrainingLoopAgent. Return TypeGuard[CustomTrainingLoopAgent] from _has_custom_training_loop so the truthy branch in handle_dse_job sees the opted-in shape and the helper can call agent.train() directly. --- src/cloudai/cli/handlers.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index 49f750529..7d8c33689 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -20,7 +20,7 @@ import signal from contextlib import contextmanager from pathlib import Path -from typing import Callable, List, Optional, Protocol, runtime_checkable +from typing import Callable, List, Optional, Protocol, TypeGuard, runtime_checkable from unittest.mock import Mock import toml @@ -133,7 +133,15 @@ class CustomTrainingLoopAgent(Protocol): def train(self) -> None: ... -def _has_custom_training_loop(agent: object) -> bool: +def _has_custom_training_loop(agent: object) -> TypeGuard[CustomTrainingLoopAgent]: + """ + Narrow ``agent`` to :class:`CustomTrainingLoopAgent` when it opts into the dispatch path. + + Returning :class:`TypeGuard` (instead of plain ``bool``) lets the type checker + treat this predicate like ``isinstance``: callers inside the truthy branch see + ``agent`` as a :class:`CustomTrainingLoopAgent`, so ``agent.train()`` type-checks + without ``getattr`` or ``cast``. + """ return bool(getattr(agent, "HAS_CUSTOM_TRAINING_LOOP", False)) From 9552e5a5ff0bf821136c3a96f00ff2cc4c7faef1 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Mon, 18 May 2026 12:21:47 -0400 Subject: [PATCH 3/8] review: isolate shutdown() failures from the exit-code contract If agent.shutdown() raised from the finally block, Python suppressed the earlier return 0/1 from agent.train() and propagated the exception, breaking the outer test-run loop in handle_dse_job (skipped remaining scenarios, failed to accumulate err |= rc). Wrap shutdown() in its own try/except, log via logging.exception, set rc = 1, and return rc after finally so the helper always honours the (int) -> int contract. Adds tests for shutdown-only failure and combined train+shutdown failure. --- src/cloudai/cli/handlers.py | 20 ++++++++++++++++---- tests/test_handlers.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index 7d8c33689..5f862f270 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -146,18 +146,30 @@ def _has_custom_training_loop(agent: object) -> TypeGuard[CustomTrainingLoopAgen def _run_custom_training_loop(agent: CustomTrainingLoopAgent, agent_type: str) -> int: - """Drive an agent's self-contained training loop and return a process-style exit code.""" + """ + Drive an agent's self-contained training loop and return a process-style exit code. + + ``shutdown()`` runs inside its own ``try/except`` so a faulty teardown cannot + suppress the exit code from ``train()`` nor propagate out of this helper: + ``handle_dse_job`` relies on the returned ``rc`` to accumulate ``err |= rc`` + and continue with the remaining test runs. + """ logging.info(f"Agent {agent_type} drives its own training loop; delegating to agent.train().") + rc = 0 try: agent.train() - return 0 except Exception: logging.exception(f"Custom training loop failed for agent {agent_type}.") - return 1 + rc = 1 finally: shutdown = getattr(agent, "shutdown", None) if callable(shutdown): - shutdown() + try: + shutdown() + except Exception: + logging.exception(f"Shutdown failed for agent {agent_type}.") + rc = 1 + return rc def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 19e4b0eae..fec9f2eff 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -354,6 +354,38 @@ def test_run_custom_training_loop_tolerates_missing_shutdown() -> None: agent.train.assert_called_once_with() +def test_run_custom_training_loop_reports_shutdown_failure( + caplog: pytest.LogCaptureFixture, +) -> None: + """shutdown() raising must not suppress the exit code or propagate the exception.""" + agent = MagicMock() + agent.train = MagicMock() + agent.shutdown = MagicMock(side_effect=RuntimeError("teardown blew up")) + + with caplog.at_level(logging.ERROR): + assert _run_custom_training_loop(agent, "mock_agent") == 1 + + agent.train.assert_called_once_with() + agent.shutdown.assert_called_once_with() + assert "teardown blew up" in caplog.text + + +def test_run_custom_training_loop_reports_combined_train_and_shutdown_failures( + caplog: pytest.LogCaptureFixture, +) -> None: + """When both train() and shutdown() raise, the helper still returns 1 and logs both.""" + agent = MagicMock() + agent.train = MagicMock(side_effect=RuntimeError("training boom")) + agent.shutdown = MagicMock(side_effect=RuntimeError("teardown boom")) + + with caplog.at_level(logging.ERROR): + assert _run_custom_training_loop(agent, "mock_agent") == 1 + + agent.shutdown.assert_called_once_with() + assert "training boom" in caplog.text + assert "teardown boom" in caplog.text + + def test_handle_dse_job_dispatches_to_custom_training_loop( slurm_system: SlurmSystem, dse_tr: TestRun, From 7d2e13b475cf15a30b9f52d5c03c9f19d772db82 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Fri, 15 May 2026 17:13:15 -0400 Subject: [PATCH 4/8] feat(configurator): add GymnasiumAdapter for CloudAI envs - GymnasiumAdapter wraps a CloudAI BaseGym as a gymnasium-compatible env: Dict of Discrete actions over the tunable params, float32 Box observation derived from define_observation_space(), gymnasium 5-tuple from step/step_raw. - Fixed (single-value) params are injected on every step so agents only ever see the tunable subset; step_raw bypasses index decoding and fixed injection for callers that already have a fully-formed parameter dict. - The adapter mirrors handle_dse_job's 1-based test_run.step so artifact paths match whether the env is driven by the DSE loop or an external training loop. - CloudAIGymEnv.define_observation_space() now returns one slot per agent metric (at least one), giving adapters the correct Box shape. - gymnasium is an optional dependency: lazy-imported inside the adapter and exposed via a new rl extra (also added to dev) pinned to ~=1.2. --- pyproject.toml | 2 + src/cloudai/configurator/__init__.py | 2 + src/cloudai/configurator/cloudai_gym.py | 7 +- src/cloudai/configurator/gymnasium_adapter.py | 137 ++++++++++++ tests/test_gymnasium_adapter.py | 205 ++++++++++++++++++ uv.lock | 41 +++- 6 files changed, 390 insertions(+), 4 deletions(-) create mode 100644 src/cloudai/configurator/gymnasium_adapter.py create mode 100644 tests/test_gymnasium_adapter.py diff --git a/pyproject.toml b/pyproject.toml index 398326fc3..99e01ae81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,9 @@ requires-python = ">=3.10" "import-linter~=2.10", "pytest-deadfixtures~=3.1", "taplo~=0.9.3", + "gymnasium~=1.2", ] + rl = ["gymnasium~=1.2"] docs = [ "sphinx~=8.1", "nvidia-sphinx-theme~=0.0.8", diff --git a/src/cloudai/configurator/__init__.py b/src/cloudai/configurator/__init__.py index f05b65c5b..a88432c41 100644 --- a/src/cloudai/configurator/__init__.py +++ b/src/cloudai/configurator/__init__.py @@ -18,11 +18,13 @@ from .base_gym import BaseGym from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry from .grid_search import GridSearchAgent +from .gymnasium_adapter import GymnasiumAdapter __all__ = [ "BaseAgent", "BaseGym", "CloudAIGymEnv", "GridSearchAgent", + "GymnasiumAdapter", "TrajectoryEntry", ] diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index d1bdba1f1..72f030627 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -76,9 +76,10 @@ def define_observation_space(self) -> list: Define the observation space for the environment. Returns: - list: The observation space. + list: One float slot per agent metric (at least one), giving the correct shape + for adapters that derive ``gymnasium.spaces.Box`` from this output. """ - return [0.0] + return [0.0] * max(len(self.test_run.test.agent_metrics), 1) def reset( self, @@ -100,7 +101,7 @@ def reset( if seed is not None: lazy.np.random.seed(seed) self.test_run.current_iteration = 0 - observation = [0.0] + observation = self.define_observation_space() info = {} return observation, info diff --git a/src/cloudai/configurator/gymnasium_adapter.py b/src/cloudai/configurator/gymnasium_adapter.py new file mode 100644 index 000000000..a03f245f7 --- /dev/null +++ b/src/cloudai/configurator/gymnasium_adapter.py @@ -0,0 +1,137 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any, ClassVar, Optional + +from .base_gym import BaseGym + +_GYMNASIUM_INSTALL_HINT = "gymnasium is required for GymnasiumAdapter. Install it with: pip install gymnasium" + + +def _import_gymnasium(): + """ + Import gymnasium + numpy lazily; raise a clear, actionable error when absent. + + Kept as a single seam so that: + * cloudai installs without ``gymnasium`` continue to work for users that don't + need this adapter (the import is gated behind ``GymnasiumAdapter()``); + * tests can patch this helper to simulate a missing install. + """ + try: + import gymnasium + import numpy as np + from gymnasium import spaces + + return gymnasium, spaces, np + except ImportError as exc: + raise ImportError(_GYMNASIUM_INSTALL_HINT) from exc + + +class GymnasiumAdapter: + """ + Expose a CloudAI :class:`BaseGym` as a standard ``gymnasium.Env``-shaped object. + + The adapter: + + * builds a ``gymnasium.spaces.Dict`` of ``Discrete`` action spaces over the + *tunable* parameters (those with more than one candidate value), and + injects the *fixed* parameters (single candidate) automatically on every + step so agents never see them. + * converts observations to ``float32`` ``numpy`` arrays sized by + ``env.define_observation_space()``. + * returns the gymnasium 5-tuple ``(obs, reward, terminated, truncated, info)`` + from :meth:`step` and :meth:`step_raw`. + * keeps ``env.test_run.step`` in sync (1-based) so artifact paths produced by + ``CloudAIGymEnv`` match those produced by ``handle_dse_job`` (i.e. + ``////`` for every evaluation), which is + required when a custom training loop (e.g. RLlib) front-ends the env. + + ``gymnasium`` and ``numpy`` are optional dependencies; importing this module + is cheap, but instantiating the adapter without them raises ``ImportError``. + """ + + metadata: ClassVar[dict[str, Any]] = {"render_modes": ["human"]} + + def __init__(self, env: BaseGym) -> None: + _, spaces, np = _import_gymnasium() + + self._np = np + self._env = env + self._step_count = 0 + + raw_action_space = env.define_action_space() + self._tunable_params: dict[str, list] = {k: v for k, v in raw_action_space.items() if len(v) > 1} + self._fixed_params: dict[str, Any] = {k: v[0] for k, v in raw_action_space.items() if len(v) == 1} + + self.action_space = spaces.Dict( + {name: spaces.Discrete(len(values)) for name, values in self._tunable_params.items()} + ) + + obs_shape = (len(env.define_observation_space()),) + self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=obs_shape, dtype=np.float32) + + @property + def unwrapped(self) -> BaseGym: + return self._env + + def decode_action(self, action: dict[str, int]) -> dict[str, Any]: + """Map discrete action indices back to the original parameter values.""" + return {name: self._tunable_params[name][idx] for name, idx in action.items()} + + def reset( + self, + *, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[Any, dict[str, Any]]: + self._step_count = 0 + obs, info = self._env.reset(seed=seed, options=options) + return self._as_obs_array(obs), info + + def step(self, action: dict[str, int]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + params = {**self._fixed_params, **self.decode_action(action)} + return self._step_with_params(params) + + def step_raw(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + """Step the env with an already-decoded parameter dict; bypasses index decoding.""" + return self._step_with_params(params) + + def render(self) -> None: + self._env.render() + + def _step_with_params(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + self._sync_underlying_step_counter() + obs, reward, done, info = self._env.step(params) + self._step_count += 1 + return self._as_obs_array(obs), float(reward), bool(done), False, info + + def _sync_underlying_step_counter(self) -> None: + """ + Mirror ``handle_dse_job``'s 1-based ``test_run.step`` so artifact paths match. + + The first step is written under ``…//1/``, matching how + ``handle_dse_job`` numbers steps; this keeps reports and trajectory + analysis consistent regardless of whether the env is driven by the + DSE loop or by an external training loop wrapping the adapter. + """ + test_run = getattr(self._env, "test_run", None) + if test_run is not None: + test_run.step = self._step_count + 1 + + def _as_obs_array(self, obs: Any) -> Any: + return self._np.asarray(obs, dtype=self._np.float32) diff --git a/tests/test_gymnasium_adapter.py b/tests/test_gymnasium_adapter.py new file mode 100644 index 000000000..946493915 --- /dev/null +++ b/tests/test_gymnasium_adapter.py @@ -0,0 +1,205 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any, Optional + +import gymnasium +import numpy as np +import pytest + +from cloudai.configurator import GymnasiumAdapter +from cloudai.configurator.base_gym import BaseGym + + +class _FakeGym(BaseGym): + """Deterministic BaseGym fixture with two tunable params and a 3-dim observation.""" + + def __init__(self) -> None: + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3], "param_b": [10, 20]} + self._observation_space: list[float] = [0.0, 0.0, 0.0] + self.last_action: Optional[dict[str, Any]] = None + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return self._observation_space + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + return [0.0, 0.0, 0.0], {} + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + self.last_action = action + return [1.0, 2.0, 3.0], 0.5, False, {"info": "test"} + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class _FixedParamGym(_FakeGym): + """Adds a single-value parameter that the adapter must treat as fixed.""" + + def define_action_space(self) -> dict[str, Any]: + return {"param_a": [1, 2, 3], "param_b": [10, 20], "fixed_param": [42]} + + +class _GymWithTestRun(_FakeGym): + """Carries a CloudAI-like ``test_run`` so we can verify the step-counter sync.""" + + def __init__(self) -> None: + super().__init__() + self.test_run = SimpleNamespace(step=0) + + +@pytest.fixture +def fake_gym() -> _FakeGym: + return _FakeGym() + + +@pytest.fixture +def adapter(fake_gym: _FakeGym) -> GymnasiumAdapter: + return GymnasiumAdapter(fake_gym) + + +def test_action_space_is_dict_of_discrete(adapter: GymnasiumAdapter) -> None: + assert isinstance(adapter.action_space, gymnasium.spaces.Dict) + assert set(adapter.action_space.spaces) == {"param_a", "param_b"} + + sub_a = adapter.action_space.spaces["param_a"] + sub_b = adapter.action_space.spaces["param_b"] + assert isinstance(sub_a, gymnasium.spaces.Discrete) and sub_a.n == 3 + assert isinstance(sub_b, gymnasium.spaces.Discrete) and sub_b.n == 2 + + +def test_observation_space_shape_matches_env(adapter: GymnasiumAdapter) -> None: + assert isinstance(adapter.observation_space, gymnasium.spaces.Box) + assert adapter.observation_space.shape == (3,) + assert adapter.observation_space.dtype == np.float32 + + +def test_reset_returns_float32_array(adapter: GymnasiumAdapter) -> None: + obs, info = adapter.reset() + assert isinstance(obs, np.ndarray) and obs.dtype == np.float32 and obs.shape == (3,) + np.testing.assert_array_equal(obs, [0.0, 0.0, 0.0]) + assert info == {} + + +def test_step_returns_gymnasium_five_tuple(adapter: GymnasiumAdapter) -> None: + adapter.reset() + obs, reward, terminated, truncated, info = adapter.step({"param_a": 0, "param_b": 1}) + + assert isinstance(obs, np.ndarray) and obs.dtype == np.float32 + np.testing.assert_array_equal(obs, [1.0, 2.0, 3.0]) + assert reward == 0.5 + assert terminated is False + assert truncated is False + assert info == {"info": "test"} + + +def test_decode_action_maps_indices_back_to_values(adapter: GymnasiumAdapter) -> None: + assert adapter.decode_action({"param_a": 0, "param_b": 1}) == {"param_a": 1, "param_b": 20} + + +def test_unwrapped_returns_original_env(fake_gym: _FakeGym, adapter: GymnasiumAdapter) -> None: + assert adapter.unwrapped is fake_gym + + +def test_single_value_params_are_excluded_from_action_space() -> None: + adapter = GymnasiumAdapter(_FixedParamGym()) + + assert set(adapter.action_space.spaces) == {"param_a", "param_b"} + assert adapter._fixed_params == {"fixed_param": 42} + + +def test_step_merges_fixed_params_into_underlying_action() -> None: + gym = _FixedParamGym() + adapter = GymnasiumAdapter(gym) + adapter.reset() + + adapter.step({"param_a": 0, "param_b": 1}) + + assert gym.last_action == {"param_a": 1, "param_b": 20, "fixed_param": 42} + + +def test_step_raw_bypasses_decode_and_fixed_injection() -> None: + gym = _FixedParamGym() + adapter = GymnasiumAdapter(gym) + adapter.reset() + raw = {"param_a": 999, "param_b": 888, "fixed_param": 777} + + obs, _reward, terminated, truncated, _info = adapter.step_raw(raw) + + assert gym.last_action == raw + assert isinstance(obs, np.ndarray) + assert terminated is False + assert truncated is False + + +def test_step_assigns_one_based_step_to_test_run() -> None: + gym = _GymWithTestRun() + adapter = GymnasiumAdapter(gym) + adapter.reset() + + adapter.step({"param_a": 0, "param_b": 1}) + assert gym.test_run.step == 1 + + adapter.step({"param_a": 1, "param_b": 0}) + assert gym.test_run.step == 2 + + +def test_step_raw_also_syncs_test_run_step() -> None: + gym = _GymWithTestRun() + adapter = GymnasiumAdapter(gym) + adapter.reset() + + adapter.step_raw({"param_a": 2, "param_b": 1}) + assert gym.test_run.step == 1 + + +def test_reset_restarts_step_counter() -> None: + gym = _GymWithTestRun() + adapter = GymnasiumAdapter(gym) + adapter.reset() + adapter.step({"param_a": 0, "param_b": 1}) + adapter.step({"param_a": 1, "param_b": 0}) + assert gym.test_run.step == 2 + + adapter.reset() + adapter.step({"param_a": 0, "param_b": 0}) + assert gym.test_run.step == 1 + + +def test_missing_gymnasium_raises_clear_error(monkeypatch: pytest.MonkeyPatch) -> None: + import cloudai.configurator.gymnasium_adapter as mod + + def _raise() -> None: + raise ImportError("pip install gymnasium") + + monkeypatch.setattr(mod, "_import_gymnasium", _raise) + + with pytest.raises(ImportError, match="pip install gymnasium"): + GymnasiumAdapter(_FakeGym()) diff --git a/uv.lock b/uv.lock index c5c9d6748..7852ea314 100644 --- a/uv.lock +++ b/uv.lock @@ -282,6 +282,7 @@ dependencies = [ [package.optional-dependencies] dev = [ { name = "build" }, + { name = "gymnasium" }, { name = "import-linter" }, { name = "pandas-stubs" }, { name = "pre-commit" }, @@ -312,6 +313,9 @@ docs-cms = [ { name = "sphinx-rtd-theme" }, { name = "sphinxcontrib-mermaid" }, ] +rl = [ + { name = "gymnasium" }, +] [package.metadata] requires-dist = [ @@ -320,6 +324,8 @@ requires-dist = [ { name = "bokeh", specifier = "~=3.8" }, { name = "build", marker = "extra == 'dev'", specifier = "~=1.4" }, { name = "click", specifier = "~=8.3" }, + { name = "gymnasium", marker = "extra == 'dev'", specifier = "~=1.2" }, + { name = "gymnasium", marker = "extra == 'rl'", specifier = "~=1.2" }, { name = "huggingface-hub", specifier = "~=1.4" }, { name = "import-linter", marker = "extra == 'dev'", specifier = "~=2.10" }, { name = "jinja2", specifier = "~=3.1.6" }, @@ -350,7 +356,16 @@ requires-dist = [ { name = "vulture", marker = "extra == 'dev'", specifier = "==2.14" }, { name = "websockets", specifier = "~=16.0" }, ] -provides-extras = ["dev", "docs", "docs-cms"] +provides-extras = ["dev", "rl", "docs", "docs-cms"] + +[[package]] +name = "cloudpickle" +version = "3.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330, upload-time = "2025-11-03T09:25:26.604Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228, upload-time = "2025-11-03T09:25:25.534Z" }, +] [[package]] name = "colorama" @@ -674,6 +689,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8a/0e/97c33bf5009bdbac74fd2beace167cab3f978feb69cc36f1ef79360d6c4e/exceptiongroup-1.3.1-py3-none-any.whl", hash = "sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598", size = 16740, upload-time = "2025-11-21T23:01:53.443Z" }, ] +[[package]] +name = "farama-notifications" +version = "0.0.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ec/91/14397890dde30adc4bee6462158933806207bc5dd10d7b4d09d5c33845cf/farama_notifications-0.0.6.tar.gz", hash = "sha256:b19acac4bb41d76e59e03394b5dd165f4761c86fa327f56307a35cbee3b60158", size = 2517, upload-time = "2026-04-24T08:43:57.603Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/f0/21f81892e4ed10f4ec3ef2e7cf8635fb76e7c0907c55d0da66be50094760/farama_notifications-0.0.6-py3-none-any.whl", hash = "sha256:f84839188efa1ce5bb361c2a84881b2dc2c0d0d7fb661ff00421820170930935", size = 2897, upload-time = "2026-04-24T08:43:56.785Z" }, +] + [[package]] name = "fastapi" version = "0.128.6" @@ -884,6 +908,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/48/b2/b096ccce418882fbfda4f7496f9357aaa9a5af1896a9a7f60d9f2b275a06/grpcio-1.78.0-cp314-cp314-win_amd64.whl", hash = "sha256:dce09d6116df20a96acfdbf85e4866258c3758180e8c49845d6ba8248b6d0bbb", size = 4929852, upload-time = "2026-02-06T09:56:45.885Z" }, ] +[[package]] +name = "gymnasium" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cloudpickle" }, + { name = "farama-notifications" }, + { name = "numpy" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4d/ff/14b6880d703dfaca204490979d3254ccd280c99550798993319902873658/gymnasium-1.3.0.tar.gz", hash = "sha256:6939e86e835d6b71b6ba6bfd360487420876deafc79bfb7bacba83a7c446bcf3", size = 830646, upload-time = "2026-04-22T13:47:14.155Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/73/fda6a25f3beeb5e49d74330b44092b9e5a547395ccd478d1103ddcbff1fc/gymnasium-1.3.0-py3-none-any.whl", hash = "sha256:6b8c159a8540dcbcb221722d7efda24d78ebbcbc3bd2ea1c2611aa2a34471fc2", size = 953904, upload-time = "2026-04-22T13:47:12.13Z" }, +] + [[package]] name = "h11" version = "0.16.0" From 0960575b522896c3b5cd417ed9179f7e1282e6c5 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Mon, 18 May 2026 12:24:35 -0400 Subject: [PATCH 5/8] review: fail fast on action / param key mismatches decode_action used to accept partial action dicts and step_raw accepted arbitrary key sets, both silently propagating incomplete params to the underlying env. Validate up front: - decode_action requires keys exactly == self._tunable_params, and each discrete index must be in range. - step_raw requires keys exactly == self._tunable_params | self._fixed_params. A new _assert_keys helper raises ValueError with sorted missing / extra lists so callers see exactly what was wrong. Adds tests for missing key, unknown key, out-of-range index, missing fixed param, and unknown raw key paths. --- src/cloudai/configurator/gymnasium_adapter.py | 34 +++++++++++++++++-- tests/test_gymnasium_adapter.py | 29 ++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/src/cloudai/configurator/gymnasium_adapter.py b/src/cloudai/configurator/gymnasium_adapter.py index a03f245f7..6b1053a85 100644 --- a/src/cloudai/configurator/gymnasium_adapter.py +++ b/src/cloudai/configurator/gymnasium_adapter.py @@ -90,8 +90,21 @@ def unwrapped(self) -> BaseGym: return self._env def decode_action(self, action: dict[str, int]) -> dict[str, Any]: - """Map discrete action indices back to the original parameter values.""" - return {name: self._tunable_params[name][idx] for name, idx in action.items()} + """ + Map discrete action indices back to the original parameter values. + + Raises: + ValueError: if ``action`` is missing tunable params, contains unknown keys, + or carries an index outside the discrete range for any tunable param. + """ + self._assert_keys(action.keys(), set(self._tunable_params), "action") + decoded: dict[str, Any] = {} + for name, idx in action.items(): + values = self._tunable_params[name] + if not 0 <= idx < len(values): + raise ValueError(f"Action index out of range for '{name}': {idx} (expected 0..{len(values) - 1})") + decoded[name] = values[idx] + return decoded def reset( self, @@ -108,12 +121,27 @@ def step(self, action: dict[str, int]) -> tuple[Any, float, bool, bool, dict[str return self._step_with_params(params) def step_raw(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: - """Step the env with an already-decoded parameter dict; bypasses index decoding.""" + """ + Step the env with an already-decoded parameter dict; bypasses index decoding. + + Raises: + ValueError: if ``params`` does not cover exactly the tunable + fixed param keys. + """ + self._assert_keys(params.keys(), set(self._tunable_params) | set(self._fixed_params), "raw params") return self._step_with_params(params) def render(self) -> None: self._env.render() + @staticmethod + def _assert_keys(received: Any, expected: set[str], ctx: str) -> None: + received_set = set(received) + if received_set == expected: + return + missing = sorted(expected - received_set) + extra = sorted(received_set - expected) + raise ValueError(f"{ctx} keys mismatch; missing={missing}, extra={extra}") + def _step_with_params(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: self._sync_underlying_step_counter() obs, reward, done, info = self._env.step(params) diff --git a/tests/test_gymnasium_adapter.py b/tests/test_gymnasium_adapter.py index 946493915..e6ec62b79 100644 --- a/tests/test_gymnasium_adapter.py +++ b/tests/test_gymnasium_adapter.py @@ -203,3 +203,32 @@ def _raise() -> None: with pytest.raises(ImportError, match="pip install gymnasium"): GymnasiumAdapter(_FakeGym()) + + +def test_decode_action_rejects_missing_keys(adapter: GymnasiumAdapter) -> None: + with pytest.raises(ValueError, match=r"missing=\['param_b'\]"): + adapter.decode_action({"param_a": 0}) + + +def test_decode_action_rejects_unknown_keys(adapter: GymnasiumAdapter) -> None: + with pytest.raises(ValueError, match=r"extra=\['bogus'\]"): + adapter.decode_action({"param_a": 0, "param_b": 1, "bogus": 0}) + + +def test_decode_action_rejects_out_of_range_index(adapter: GymnasiumAdapter) -> None: + with pytest.raises(ValueError, match=r"out of range for 'param_a'"): + adapter.decode_action({"param_a": 99, "param_b": 0}) + + +def test_step_raw_rejects_missing_fixed_param() -> None: + adapter = GymnasiumAdapter(_FixedParamGym()) + adapter.reset() + with pytest.raises(ValueError, match=r"missing=\['fixed_param'\]"): + adapter.step_raw({"param_a": 1, "param_b": 10}) + + +def test_step_raw_rejects_unknown_keys() -> None: + adapter = GymnasiumAdapter(_FixedParamGym()) + adapter.reset() + with pytest.raises(ValueError, match=r"extra=\['bogus'\]"): + adapter.step_raw({"param_a": 1, "param_b": 10, "fixed_param": 42, "bogus": 0}) From 7a27fa2f3570545fc83322e73e24c328f46b7e92 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Tue, 26 May 2026 16:06:39 -0700 Subject: [PATCH 6/8] test(configurator): pin env_params in CloudAIGymEnv cache key (TDD red) CloudAIGymEnv.get_cached_trajectory_result(action) keys the trajectory cache on action alone. When a workload declares env_params (e.g. drop_rate) and the agent re-selects the same action under a different env_params sample, the cache returns a stale reward measured under a different env, silently invalidating any domain-randomization workflow. Four tests pin the contract; today two FAIL (bug-exposing), two PASS (back-compat sanity). All 27 pre-existing tests are untouched. FAIL test_cache_miss_when_env_params_differ FAIL test_step_reruns_workload_when_env_params_change PASS test_cache_hit_when_action_and_env_params_match PASS test_cache_hit_when_neither_has_env_params Driver for the follow-up PR that adds env_params as a first-class field on TestDefinition + TrajectoryEntry and rewrites the cache key as (action, env_params). Both unit and integration shapes are covered so the fix can be validated end-to-end through env.step(). --- tests/test_cloudaigym.py | 127 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index ecb9eb0a5..452f3c88e 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -419,3 +419,130 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_ contents = csv_path.read_text().strip().splitlines() assert contents[0] == "step,action,reward,observation" assert contents[-1].startswith("5,") + + +def _seed_cached_entry_with_env_params( + env: CloudAIGymEnv, action: dict[str, object], env_params: dict[str, object] +) -> None: + """Seed env.trajectory with one entry, attaching env_params via object.__setattr__. + + TrajectoryEntry is a frozen dataclass and does not yet declare env_params. + Once the field is added, drop this helper and pass env_params as a kwarg. + """ + entry = TrajectoryEntry(step=1, action=action, reward=0.5, observation=[100.0]) + object.__setattr__(entry, "env_params", env_params) + env.test_run.current_iteration = 0 + env.trajectory = {0: [entry]} + + +def test_cache_miss_when_env_params_differ(base_tr: TestRun, tmp_path: Path) -> None: + """Cache MUST miss when env_params differ, even if action is identical. + + Without this property the agent receives stale rewards on every cache hit + under domain randomization. PPO/DQN/BO all silently train on labels that + do not correspond to the env they were nominally generated under. + """ + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"drop_rate": 0.001}) + + env.test_run.current_env_params = {"drop_rate": 0.01} # type: ignore[attr-defined] + + assert env.get_cached_trajectory_result({"x": 10}) is None, ( + "Cache must include env_params in its key. The current implementation " + "keys on action alone, so trials repeating the same action under a " + "different env_params sample receive a stale cached reward. See " + "env-params-cloudai-corpus-plan.md." + ) + + +def test_cache_hit_when_action_and_env_params_match(base_tr: TestRun, tmp_path: Path) -> None: + """Same action AND same env_params must still HIT the cache.""" + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"drop_rate": 0.001}) + + env.test_run.current_env_params = {"drop_rate": 0.001} # type: ignore[attr-defined] + + result = env.get_cached_trajectory_result({"x": 10}) + assert result is not None and result.step == 1 + + +def test_cache_hit_when_neither_has_env_params(base_tr: TestRun, tmp_path: Path) -> None: + """Workloads without env_params behave exactly as today (back-compat).""" + runner = MagicMock() + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = MagicMock(test_runs=[]) + runner.jobs = {} + runner.testrun_to_job_map = {} + + env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) + env.test_run.current_iteration = 0 + env.trajectory = {0: [TrajectoryEntry(step=1, action={"x": 10}, reward=0.5, observation=[100.0])]} + # Note: neither the cached entry nor test_run carries env_params -> existing behavior. + + result = env.get_cached_trajectory_result({"x": 10}) + assert result is not None and result.step == 1 + + +def test_step_reruns_workload_when_env_params_change( + nemorun: NeMoRunTestDefinition, tmp_path: Path +) -> None: + """Integration: env.step() with same action but different env_params re-runs the workload. + + Counterpart to test_cache_miss_when_env_params_differ but exercising the + full step() flow: increment_step -> apply_params_set -> cache lookup -> + runner.run() -> write_trajectory. + """ + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + tdef.agent_metrics = ["default"] + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + reports={NeMoRunReportGenerationStrategy}, + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs = {} + runner.testrun_to_job_map = {} + runner.shutting_down = False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + action = {"trainer.max_steps": 1000} + fake_obs = iter([[100.0], [50.0]]) + + with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): + env.test_run.step = 0 + env.test_run.current_env_params = {"drop_rate": 0.001} # type: ignore[attr-defined] + obs1, _r1, *_ = env.step(action) + + env.test_run.current_env_params = {"drop_rate": 0.01} # type: ignore[attr-defined] + obs2, _r2, *_ = env.step(action) + + assert runner.run.call_count == 2, ( + "Different env_params between two env.step() calls with the same action " + "must trigger a workload re-run; the cache lookup must miss." + ) + assert obs1 != obs2, "fresh workload run should produce a fresh observation" From 07a29f4310b8204eb4adbb1db3190ae96d0c3ab9 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Tue, 26 May 2026 16:22:31 -0700 Subject: [PATCH 7/8] fix(configurator): make env_params a first-class trial-identity field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Domain randomization broke under the trajectory cache because the cache key was action-only: an agent revisiting the same action under a different env_params sample (e.g. drop_rate) got the *first* trial's reward served back, silently corrupting the reward signal. env.csv was also workload-owned and skipped on cache hits, so it was sparse and mis-aligned with trajectory.csv. This change makes env_params first-class in cloudai so the design works the same way for every agent (PPO, BO, GA, MAB) and every workload: - TestDefinition.env_params: dict[str, EnvParamSpec] - sibling to cmd_args, not part of the action space. - TestRun.current_env_params: dict[str, Any] - per-trial sample attached by the env so the cache key and downstream sinks see it. - TrajectoryEntry gains env_params; the cache key is now (action, current_env_params). - CloudAIGymEnv builds an EnvParamsObserver when env_params is declared. before_step() samples deterministically by (seed, name, trial), stashes the sample on test_run, and appends to env.csv *before* the cache lookup - so env.csv is written on every trial (cache hit or miss) and aligns 1:1 with trajectory.csv by step. - Workloads with no [env_params.*] block pay zero overhead: no observer, no env.csv, identical behaviour to today. Tests: turns the four TDD-red tests from the parent PR green and adds nine unit tests for EnvParamSpec / EnvParamsSampler / CsvSink / EnvParamsObserver plus two integration tests pinning the env.csv ↔ trajectory.csv contract. --- src/cloudai/_core/test_scenario.py | 1 + src/cloudai/configurator/cloudai_gym.py | 48 ++++++- src/cloudai/configurator/env_params.py | 168 ++++++++++++++++++++++++ src/cloudai/models/workload.py | 10 ++ tests/test_cloudaigym.py | 95 ++++++++++++-- tests/test_env_params.py | 122 +++++++++++++++++ 6 files changed, 428 insertions(+), 16 deletions(-) create mode 100644 src/cloudai/configurator/env_params.py create mode 100644 tests/test_env_params.py diff --git a/src/cloudai/_core/test_scenario.py b/src/cloudai/_core/test_scenario.py index 4c768158d..9eabe32c6 100644 --- a/src/cloudai/_core/test_scenario.py +++ b/src/cloudai/_core/test_scenario.py @@ -96,6 +96,7 @@ class TestRun: post_test: Optional[TestScenario] = None reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set) extra_srun_args: str | None = None + current_env_params: dict[str, Any] = field(default_factory=dict) def __hash__(self) -> int: return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration)) diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index 72f030627..121b7bc3e 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -19,13 +19,14 @@ import dataclasses import logging from pathlib import Path -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from cloudai.core import METRIC_ERROR, BaseRunner, Registry, TestRun from cloudai.util.lazy_imports import lazy from .base_agent import RewardOverrides from .base_gym import BaseGym +from .env_params import CsvSink, EnvParamsObserver, StepObserver @dataclasses.dataclass(frozen=True) @@ -36,6 +37,7 @@ class TrajectoryEntry: action: dict[str, Any] reward: float observation: list + env_params: dict[str, Any] = dataclasses.field(default_factory=dict) class CloudAIGymEnv(BaseGym): @@ -61,8 +63,27 @@ def __init__(self, test_run: TestRun, runner: BaseRunner, rewards: RewardOverrid self.max_steps = test_run.test.agent_steps self.reward_function = Registry().get_reward_function(test_run.test.agent_reward_function) self.trajectory: dict[int, list[TrajectoryEntry]] = {} + self.observers: List[StepObserver] = self._build_observers() super().__init__() + def _build_observers(self) -> List[StepObserver]: + """ + Construct the per-step observers implied by the TestDefinition. + + Workloads opt in to env_params via a TOML ``[env_params.]`` block; + an empty mapping yields no observers and zero overhead. + """ + observers: List[StepObserver] = [] + if self.test_run.test.env_params: + seed = int((self.test_run.test.agent_config or {}).get("random_seed", 0)) + sink = CsvSink(self._env_csv_path()) + observers.append(EnvParamsObserver(self.test_run.test.env_params, sink, seed)) + return observers + + def _env_csv_path(self) -> Path: + """``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them.""" + return self.trajectory_file_path.parent / "env.csv" + def define_action_space(self) -> Dict[str, list[Any]]: return self.test_run.param_space @@ -121,6 +142,9 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: """ self.test_run = self.test_run.apply_params_set(action) + for observer in self.observers: + observer.before_step(self.test_run) + cached_result = self.get_cached_trajectory_result(action) if cached_result is not None: logging.info( @@ -134,8 +158,11 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: action=action, reward=cached_result.reward, observation=cached_result.observation, + env_params=dict(self.test_run.current_env_params), ) ) + for observer in self.observers: + observer.after_step(self.test_run, cached_result.observation, cached_result.reward) return cached_result.observation, cached_result.reward, False, {} if not self.test_run.test.constraint_check(self.test_run, self.runner.system): @@ -171,9 +198,13 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: action=action, reward=reward, observation=observation, + env_params=dict(self.test_run.current_env_params), ) ) + for observer in self.observers: + observer.after_step(self.test_run, observation, reward) + return observation, reward, False, {} def render(self, mode: str = "human"): @@ -252,8 +283,21 @@ def current_trajectory(self) -> list[TrajectoryEntry]: return self.trajectory.setdefault(self.test_run.current_iteration, []) def get_cached_trajectory_result(self, action: Any) -> TrajectoryEntry | None: + """ + Return a cached entry only when the full trial identity matches. + + Trial identity is ``(action, env_params)``: env-randomized parameters + change the workload's behaviour, so a trial repeating the same action + under a different ``env_params`` sample must miss and re-run. Empty + env_params on both sides is the back-compat path for workloads that + do not declare any ``[env_params.*]`` block. + """ + current_env_params = getattr(self.test_run, "current_env_params", {}) or {} for entry in self.current_trajectory: - if self._values_match_exact(entry.action, action): + if not self._values_match_exact(entry.action, action): + continue + entry_env = getattr(entry, "env_params", {}) or {} + if self._values_match_exact(entry_env, current_env_params): return entry return None diff --git a/src/cloudai/configurator/env_params.py b/src/cloudai/configurator/env_params.py new file mode 100644 index 000000000..c5dfb87fc --- /dev/null +++ b/src/cloudai/configurator/env_params.py @@ -0,0 +1,168 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Domain-randomization primitives for CloudAI DSE. + +An env-randomized parameter is a workload knob whose value the environment +samples per trial (categorical, optional weights). It is sibling to +``cmd_args`` on a ``TestDefinition`` and does not enter the agent's action +space; the policy learns a robust mapping under that variation. + +This module owns the data schema (``EnvParamSpec``), the deterministic +sampler (``EnvParamsSampler``), the persistence interface +(``EnvParamsSink`` + ``CsvSink``) and the per-step observer +(``EnvParamsObserver``). ``CloudAIGymEnv`` consumes these directly so the +artifacts (``env.csv``) and the cache key align 1:1 with ``trajectory.csv`` +regardless of agent (PPO, BO, GA, MAB) or workload. +""" + +from __future__ import annotations + +import csv +import random +from pathlib import Path +from typing import Any, Dict, List, Optional, Protocol, runtime_checkable + +from pydantic import BaseModel, ConfigDict, Field, model_validator +from typing_extensions import Self + + +class EnvParamSpec(BaseModel): + """Specification of one env-randomized parameter (categorical).""" + + model_config = ConfigDict(extra="forbid") + + values: List[Any] = Field( + min_length=2, + description="Candidate values; a single-valued parameter is just a fixed cmd_args entry.", + ) + weights: Optional[List[float]] = Field( + default=None, + description="Optional probability weights aligned with values; uniform if omitted.", + ) + + @model_validator(mode="after") + def _validate_weights(self) -> Self: + if self.weights is None: + return self + if len(self.weights) != len(self.values): + raise ValueError( + f"env_params weights length {len(self.weights)} does not match values length {len(self.values)}" + ) + for w in self.weights: + if w < 0: + raise ValueError(f"env_params weights must be non-negative; got {w}") + if sum(self.weights) <= 0: + raise ValueError("env_params weights must have a positive sum") + return self + + +class EnvParamsSampler: + """ + Per-trial categorical sampler. + + Determinism contract: ``sample(t)`` returns the same dict on every call + (across processes) for the same ``(seed, env_params, t)``. + + Independence contract: each parameter uses an RNG seeded by + ``f"{seed}:{name}:{trial}"`` so adding or removing an unrelated + parameter does not perturb existing parameters' draw sequences. + """ + + def __init__(self, env_params: Dict[str, EnvParamSpec], seed: int) -> None: + self._env_params = env_params + self._seed = seed + + def sample(self, trial: int) -> Dict[str, Any]: + out: Dict[str, Any] = {} + for name, spec in self._env_params.items(): + rng = random.Random(f"{self._seed}:{name}:{trial}") + if spec.weights is not None: + out[name] = rng.choices(spec.values, weights=spec.weights, k=1)[0] + else: + out[name] = rng.choice(spec.values) + return out + + +@runtime_checkable +class EnvParamsSink(Protocol): + """Persist one trial's env_params sample; empty samples must be no-ops.""" + + def write(self, step: int, sample: Dict[str, Any]) -> None: ... + + +class CsvSink: + """ + Append per-trial env_params samples to a step-aligned CSV. + + The CSV mirrors how ``trajectory.csv`` serialises its ``action`` column + (one row per env.step(), sample dict stringified in a single cell) so the + two files align 1:1 on ``step`` and a plain ``merge`` joins them. + """ + + def __init__(self, path: Path) -> None: + self._path = path + + def write(self, step: int, sample: Dict[str, Any]) -> None: + if step < 1: + raise ValueError(f"step must be a positive trial index (cloudai DSE is 1-based); got {step}") + if not sample: + return + new_file = not self._path.exists() + self._path.parent.mkdir(parents=True, exist_ok=True) + with self._path.open("a", newline="") as f: + writer = csv.writer(f) + if new_file: + writer.writerow(("step", "env")) + writer.writerow([step, sample]) + + +@runtime_checkable +class StepObserver(Protocol): + """ + Hook fired by ``CloudAIGymEnv.step()`` around each trial. + + ``before_step`` runs before the cache lookup and before any workload + execution. ``after_step`` runs after the trajectory row is written. + """ + + def before_step(self, test_run: Any) -> None: ... + + def after_step(self, test_run: Any, observation: list, reward: float) -> None: ... + + +class EnvParamsObserver: + """ + StepObserver that samples env_params per step and persists them. + + Pre-step: samples ``test_run.test.env_params`` for ``test_run.step``, + stashes the result on ``test_run.current_env_params`` (so the cache key + and the workload's substitution both see it), and appends a row to + ``env.csv``. Post-step: no-op (trajectory.csv is written by CloudAIGymEnv). + """ + + def __init__(self, env_params: Dict[str, EnvParamSpec], sink: EnvParamsSink, seed: int) -> None: + self._sampler = EnvParamsSampler(env_params, seed=seed) + self._sink = sink + + def before_step(self, test_run: Any) -> None: + sample = self._sampler.sample(test_run.step) + test_run.current_env_params = sample + self._sink.write(test_run.step, sample) + + def after_step(self, test_run: Any, observation: list, reward: float) -> None: + del test_run, observation, reward # no-op; trajectory.csv handled by env diff --git a/src/cloudai/models/workload.py b/src/cloudai/models/workload.py index 34965454a..f5062d3b4 100644 --- a/src/cloudai/models/workload.py +++ b/src/cloudai/models/workload.py @@ -23,6 +23,8 @@ from cloudai.core import GitRepo, Installable, JobStatusResult, PythonExecutable, Registry, System, TestRun +from ..configurator.env_params import EnvParamSpec + class CmdArgs(BaseModel): """Test command arguments.""" @@ -110,6 +112,14 @@ class TestDefinition(BaseModel, ABC): agent_metrics: list[str] = Field(default=["default"]) agent_reward_function: str = "inverse" agent_config: dict[str, Any] | None = Field(default=None, description="Agent configuration.") + env_params: dict[str, EnvParamSpec] = Field( + default_factory=dict, + description=( + "Domain-randomized parameters sampled by the env per trial. Sibling to " + "cmd_args; not part of the agent's action space. CloudAIGymEnv samples, " + "persists to env.csv, and includes them in the trajectory cache key." + ), + ) @property def cmd_args_dict(self) -> Dict[str, Union[str, List[str]]]: diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index 452f3c88e..0027a070d 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -21,6 +21,7 @@ import pytest from cloudai.configurator import CloudAIGymEnv, GridSearchAgent, TrajectoryEntry +from cloudai.configurator.env_params import EnvParamSpec from cloudai.core import BaseRunner, RewardOverrides, Runner, TestRun, TestScenario from cloudai.systems.slurm import SlurmSystem from cloudai.util import flatten_dict @@ -424,13 +425,8 @@ def test_cached_step_appends_trajectory_row(nemorun: NeMoRunTestDefinition, tmp_ def _seed_cached_entry_with_env_params( env: CloudAIGymEnv, action: dict[str, object], env_params: dict[str, object] ) -> None: - """Seed env.trajectory with one entry, attaching env_params via object.__setattr__. - - TrajectoryEntry is a frozen dataclass and does not yet declare env_params. - Once the field is added, drop this helper and pass env_params as a kwarg. - """ - entry = TrajectoryEntry(step=1, action=action, reward=0.5, observation=[100.0]) - object.__setattr__(entry, "env_params", env_params) + """Seed env.trajectory with one entry carrying the given env_params.""" + entry = TrajectoryEntry(step=1, action=action, reward=0.5, observation=[100.0], env_params=env_params) env.test_run.current_iteration = 0 env.trajectory = {0: [entry]} @@ -452,7 +448,7 @@ def test_cache_miss_when_env_params_differ(base_tr: TestRun, tmp_path: Path) -> env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"drop_rate": 0.001}) - env.test_run.current_env_params = {"drop_rate": 0.01} # type: ignore[attr-defined] + env.test_run.current_env_params = {"drop_rate": 0.01} assert env.get_cached_trajectory_result({"x": 10}) is None, ( "Cache must include env_params in its key. The current implementation " @@ -474,7 +470,7 @@ def test_cache_hit_when_action_and_env_params_match(base_tr: TestRun, tmp_path: env = CloudAIGymEnv(test_run=base_tr, runner=runner, rewards=RewardOverrides()) _seed_cached_entry_with_env_params(env, {"x": 10}, env_params={"drop_rate": 0.001}) - env.test_run.current_env_params = {"drop_rate": 0.001} # type: ignore[attr-defined] + env.test_run.current_env_params = {"drop_rate": 0.001} result = env.get_cached_trajectory_result({"x": 10}) assert result is not None and result.step == 1 @@ -498,9 +494,7 @@ def test_cache_hit_when_neither_has_env_params(base_tr: TestRun, tmp_path: Path) assert result is not None and result.step == 1 -def test_step_reruns_workload_when_env_params_change( - nemorun: NeMoRunTestDefinition, tmp_path: Path -) -> None: +def test_step_reruns_workload_when_env_params_change(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: """Integration: env.step() with same action but different env_params re-runs the workload. Counterpart to test_cache_miss_when_env_params_differ but exercising the @@ -535,10 +529,10 @@ def test_step_reruns_workload_when_env_params_change( with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): env.test_run.step = 0 - env.test_run.current_env_params = {"drop_rate": 0.001} # type: ignore[attr-defined] + env.test_run.current_env_params = {"drop_rate": 0.001} obs1, _r1, *_ = env.step(action) - env.test_run.current_env_params = {"drop_rate": 0.01} # type: ignore[attr-defined] + env.test_run.current_env_params = {"drop_rate": 0.01} obs2, _r2, *_ = env.step(action) assert runner.run.call_count == 2, ( @@ -546,3 +540,76 @@ def test_step_reruns_workload_when_env_params_change( "must trigger a workload re-run; the cache lookup must miss." ) assert obs1 != obs2, "fresh workload run should produce a fresh observation" + + +def test_env_csv_is_step_aligned_with_trajectory(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: + """env.csv must have exactly one row per env.step() call, with steps aligned 1:1 to trajectory.csv. + + This pins the corpus-friendly contract: a downstream consumer can + ``pd.merge(traj, env, on="step")`` without losing rows on either side, + independent of whether the trial hit the trajectory cache. + """ + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + tdef.agent_metrics = ["default"] + tdef.env_params = {"drop_rate": EnvParamSpec(values=[0.0, 0.001, 0.01])} + tdef.agent_config = {"random_seed": 42} + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + reports={NeMoRunReportGenerationStrategy}, + ) + test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = test_scenario + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + action_a, action_b = {"trainer.max_steps": 1000}, {"trainer.max_steps": 2000} + fake_obs = iter([[100.0], [50.0], [25.0]]) + + with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): + env.test_run.step = 0 + for step_idx, action in enumerate((action_a, action_b, action_a), start=1): + env.test_run.step = step_idx + env.step(action) + + env_csv = env._env_csv_path() + traj_csv = env.trajectory_file_path + assert env_csv.exists(), "env.csv must be written when env_params is declared" + + env_steps = [int(line.split(",", 1)[0]) for line in env_csv.read_text().strip().splitlines()[1:]] + traj_steps = [int(line.split(",", 1)[0]) for line in traj_csv.read_text().strip().splitlines()[1:]] + assert env_steps == traj_steps == [1, 2, 3], ( + f"step columns must align 1:1 across env.csv ({env_steps}) and trajectory.csv ({traj_steps})" + ) + + +def test_no_env_csv_when_env_params_not_declared(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: + """Workloads without [env_params.*] pay zero overhead: no observer, no env.csv.""" + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + test_run = TestRun( + name="plain_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "plain_tr" / "0", + reports={NeMoRunReportGenerationStrategy}, + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + assert env.observers == [], "no env_params declared -> no per-step observers" + assert not env._env_csv_path().exists() diff --git a/tests/test_env_params.py b/tests/test_env_params.py new file mode 100644 index 000000000..257b61f69 --- /dev/null +++ b/tests/test_env_params.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the domain-randomization primitives in cloudai.configurator.env_params.""" + +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace +from typing import Any, Dict, List + +import pytest +from pydantic import ValidationError + +from cloudai.configurator.env_params import ( + CsvSink, + EnvParamsObserver, + EnvParamSpec, + EnvParamsSampler, +) + + +class _RecordingSink: + """Test double capturing every (step, sample) pair sent to the sink.""" + + def __init__(self) -> None: + self.calls: List[tuple[int, Dict[str, Any]]] = [] + + def write(self, step: int, sample: Dict[str, Any]) -> None: + self.calls.append((step, dict(sample))) + + +def test_env_param_spec_requires_at_least_two_values() -> None: + with pytest.raises(ValidationError): + EnvParamSpec(values=[0.0]) + + +def test_env_param_spec_rejects_mismatched_weights() -> None: + with pytest.raises(ValidationError): + EnvParamSpec(values=[0.0, 0.1], weights=[1.0]) + + +def test_env_param_spec_rejects_zero_sum_weights() -> None: + with pytest.raises(ValidationError): + EnvParamSpec(values=[0.0, 0.1], weights=[0.0, 0.0]) + + +def test_sampler_is_deterministic_across_calls() -> None: + spec = {"drop_rate": EnvParamSpec(values=[0.0, 0.001, 0.01])} + a = EnvParamsSampler(spec, seed=42) + b = EnvParamsSampler(spec, seed=42) + seq_a = [a.sample(t) for t in range(1, 6)] + seq_b = [b.sample(t) for t in range(1, 6)] + assert seq_a == seq_b, "same (seed, trial) must produce the same draw across instances" + + +def test_sampler_each_param_is_independent() -> None: + """Adding an unrelated parameter must not perturb existing parameters' draws.""" + base = {"drop_rate": EnvParamSpec(values=[0.0, 0.001, 0.01])} + extended = { + "drop_rate": EnvParamSpec(values=[0.0, 0.001, 0.01]), + "latency_ms": EnvParamSpec(values=[1, 5, 10]), + } + a = [EnvParamsSampler(base, seed=7).sample(t)["drop_rate"] for t in range(1, 11)] + b = [EnvParamsSampler(extended, seed=7).sample(t)["drop_rate"] for t in range(1, 11)] + assert a == b, "per-parameter RNG seeding must isolate parameters from each other" + + +def test_csv_sink_skips_empty_samples_and_rejects_zero_step(tmp_path: Path) -> None: + sink = CsvSink(tmp_path / "env.csv") + sink.write(1, {}) # empty -> no-op, no file + assert not (tmp_path / "env.csv").exists() + with pytest.raises(ValueError): + sink.write(0, {"drop_rate": 0.0}) + + +def test_csv_sink_writes_header_then_rows(tmp_path: Path) -> None: + sink = CsvSink(tmp_path / "env.csv") + sink.write(1, {"drop_rate": 0.001}) + sink.write(2, {"drop_rate": 0.01}) + contents = (tmp_path / "env.csv").read_text().strip().splitlines() + assert contents[0] == "step,env" + assert contents[1].startswith("1,") + assert contents[2].startswith("2,") + + +def test_observer_sets_current_env_params_and_persists_sample() -> None: + spec = {"drop_rate": EnvParamSpec(values=[0.0, 0.001, 0.01])} + sink = _RecordingSink() + observer = EnvParamsObserver(spec, sink, seed=42) + test_run = SimpleNamespace(step=3, current_env_params={}) + + observer.before_step(test_run) + + assert "drop_rate" in test_run.current_env_params + assert test_run.current_env_params["drop_rate"] in {0.0, 0.001, 0.01} + assert sink.calls == [(3, dict(test_run.current_env_params))] + + +def test_observer_after_step_is_noop() -> None: + """after_step must not touch test_run or sink; trajectory.csv handles persistence.""" + sink = _RecordingSink() + observer = EnvParamsObserver({}, sink, seed=0) + test_run = SimpleNamespace(step=1, current_env_params={"x": 1}) + + observer.after_step(test_run, observation=[0.0], reward=0.0) + + assert sink.calls == [] + assert test_run.current_env_params == {"x": 1} From d0375b435d430eba2dbfa49d1efea27fae6b6bab Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Tue, 26 May 2026 16:29:45 -0700 Subject: [PATCH 8/8] test(configurator): add cache-hit + declared-env_params integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the most important gap in the env_params test matrix: the cache-HIT path under an observer-driven TestDefinition.env_params. The previous tests covered each axis separately - cache key unit tests for env_params, a step()-level cache-miss integration with manually set current_env_params, and a step-alignment test where all trials were cache misses. None of them proved that a cache HIT still drives the observer (and therefore env.csv) - which is precisely the sparsity contract the fix was about. The new test: - declares env_params on the TestDefinition (real observer wired in), - pre-seeds the trajectory with an entry whose action AND deterministic env_params sample match what the observer will produce for step 1, - calls env.step() once and asserts: (a) runner.run is NOT called (cache short-circuit honored), (b) env.csv gains a row (observer fired before the cache lookup), (c) the new trajectory entry carries the per-trial env_params. This is the end-to-end proof that the env.csv sparsity bug cannot recur, and that env.csv ↔ trajectory.csv stay step-aligned on the cache-hit path too. --- tests/test_cloudaigym.py | 65 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index 0027a070d..e6dca3f51 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -593,6 +593,71 @@ def test_env_csv_is_step_aligned_with_trajectory(nemorun: NeMoRunTestDefinition, ) +def test_step_cache_hit_with_declared_env_params_still_writes_env_csv( + nemorun: NeMoRunTestDefinition, tmp_path: Path +) -> None: + """End-to-end: cache HIT under observer-driven env_params still records env.csv. + + This is the contract that was broken before the fix. With env_params + declared on the TestDefinition, ``CloudAIGymEnv`` must fire its + EnvParamsObserver *before* the cache lookup so every trial - hit or + miss - appends to env.csv and stays step-aligned with trajectory.csv. + Asserts: (a) the workload is NOT re-run (cache short-circuit), (b) + env.csv gains a row, (c) trajectory.csv gains a row carrying the + sampled env_params. + """ + import random as _random + + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + tdef.agent_metrics = ["default"] + tdef.env_params = {"drop_rate": EnvParamSpec(values=[0.0, 0.001, 0.01])} + tdef.agent_config = {"random_seed": 42} + + test_run = TestRun( + name="dr_tr", + test=tdef, + num_nodes=1, + nodes=[], + output_path=tmp_path / "out" / "dr_tr" / "0", + reports={NeMoRunReportGenerationStrategy}, + ) + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + runner.test_scenario = TestScenario(name="dr_scenario", test_runs=[test_run]) + runner.jobs, runner.testrun_to_job_map, runner.shutting_down = {}, {}, False + runner.get_job_output_path.return_value = test_run.output_path + + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + assert env.observers, "TestDefinition.env_params declared -> observer must be built" + + expected_sample = {"drop_rate": _random.Random("42:drop_rate:1").choice([0.0, 0.001, 0.01])} + action = {"trainer.max_steps": 1000} + env.test_run.current_iteration = 0 + env.trajectory = { + 0: [TrajectoryEntry(step=0, action=action, reward=0.42, observation=[0.84], env_params=expected_sample)] + } + env.test_run.step = 1 + + with patch.object(env, "get_observation", side_effect=AssertionError("cache miss path must not run")): + obs, reward, _done, _info = env.step(action) + + runner.run.assert_not_called() + assert reward == 0.42 and obs == [0.84] + + env_csv = env._env_csv_path() + assert env_csv.exists(), "cache HIT must NOT skip the observer; env.csv must record the trial" + env_rows = env_csv.read_text().strip().splitlines() + assert env_rows[0] == "step,env" + assert env_rows[1].startswith("1,"), f"expected step 1 row in env.csv, got {env_rows[1]!r}" + + traj_rows = env.trajectory[0] + assert len(traj_rows) == 2 and traj_rows[-1].env_params == expected_sample, ( + "cache-hit trajectory entry must record the per-trial env_params sample" + ) + + def test_no_env_csv_when_env_params_not_declared(nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: """Workloads without [env_params.*] pay zero overhead: no observer, no env.csv.""" tdef = nemorun.model_copy(deep=True)