Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ requires-python = ">=3.10"
"import-linter~=2.10",
"pytest-deadfixtures~=3.1",
"taplo~=0.9.3",
"gymnasium~=1.2",
]
rl = ["gymnasium~=1.2"]
docs = [
"sphinx~=8.1",
"nvidia-sphinx-theme~=0.0.8",
Expand Down
60 changes: 59 additions & 1 deletion src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import signal
from contextlib import contextmanager
from pathlib import Path
from typing import Callable, List, Optional
from typing import Callable, List, Optional, Protocol, TypeGuard, runtime_checkable
from unittest.mock import Mock

import toml
Expand Down Expand Up @@ -118,6 +118,60 @@ def prepare_installation(
return installables, installer


@runtime_checkable
class CustomTrainingLoopAgent(Protocol):
"""
Agent that drives its own training loop and skips the ``handle_dse_job`` step loop.

Set ``HAS_CUSTOM_TRAINING_LOOP = True`` on the agent class to opt in. Used by
agents (e.g. RLlib-based) whose training loops are not modelled as a sequence
of independent ``select_action`` / ``env.step`` calls.
"""

HAS_CUSTOM_TRAINING_LOOP: bool

def train(self) -> None: ...


def _has_custom_training_loop(agent: object) -> TypeGuard[CustomTrainingLoopAgent]:
"""
Narrow ``agent`` to :class:`CustomTrainingLoopAgent` when it opts into the dispatch path.

Returning :class:`TypeGuard` (instead of plain ``bool``) lets the type checker
treat this predicate like ``isinstance``: callers inside the truthy branch see
``agent`` as a :class:`CustomTrainingLoopAgent`, so ``agent.train()`` type-checks
without ``getattr`` or ``cast``.
"""
return bool(getattr(agent, "HAS_CUSTOM_TRAINING_LOOP", False))


def _run_custom_training_loop(agent: CustomTrainingLoopAgent, agent_type: str) -> int:
"""
Drive an agent's self-contained training loop and return a process-style exit code.

``shutdown()`` runs inside its own ``try/except`` so a faulty teardown cannot
suppress the exit code from ``train()`` nor propagate out of this helper:
``handle_dse_job`` relies on the returned ``rc`` to accumulate ``err |= rc``
and continue with the remaining test runs.
"""
logging.info(f"Agent {agent_type} drives its own training loop; delegating to agent.train().")
rc = 0
try:
agent.train()
except Exception:
logging.exception(f"Custom training loop failed for agent {agent_type}.")
rc = 1
finally:
shutdown = getattr(agent, "shutdown", None)
if callable(shutdown):
try:
shutdown()
except Exception:
logging.exception(f"Shutdown failed for agent {agent_type}.")
rc = 1
return rc


def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
registry = Registry()

Expand Down Expand Up @@ -157,6 +211,10 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:

agent = agent_class(env, agent_config)

if _has_custom_training_loop(agent):
err |= _run_custom_training_loop(agent, agent_type)
continue

for step in range(agent.max_steps):
result = agent.select_action()
if result is None:
Expand Down
2 changes: 2 additions & 0 deletions src/cloudai/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from .base_gym import BaseGym
from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry
from .grid_search import GridSearchAgent
from .gymnasium_adapter import GymnasiumAdapter

__all__ = [
"BaseAgent",
"BaseGym",
"CloudAIGymEnv",
"GridSearchAgent",
"GymnasiumAdapter",
"TrajectoryEntry",
]
7 changes: 4 additions & 3 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ def define_observation_space(self) -> list:
Define the observation space for the environment.

Returns:
list: The observation space.
list: One float slot per agent metric (at least one), giving the correct shape
for adapters that derive ``gymnasium.spaces.Box`` from this output.
"""
return [0.0]
return [0.0] * max(len(self.test_run.test.agent_metrics), 1)

def reset(
self,
Expand All @@ -100,7 +101,7 @@ def reset(
if seed is not None:
lazy.np.random.seed(seed)
self.test_run.current_iteration = 0
observation = [0.0]
observation = self.define_observation_space()
info = {}
return observation, info

Expand Down
165 changes: 165 additions & 0 deletions src/cloudai/configurator/gymnasium_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import Any, ClassVar, Optional

from .base_gym import BaseGym

_GYMNASIUM_INSTALL_HINT = "gymnasium is required for GymnasiumAdapter. Install it with: pip install gymnasium"


def _import_gymnasium():
"""
Import gymnasium + numpy lazily; raise a clear, actionable error when absent.

Kept as a single seam so that:
* cloudai installs without ``gymnasium`` continue to work for users that don't
need this adapter (the import is gated behind ``GymnasiumAdapter()``);
* tests can patch this helper to simulate a missing install.
"""
try:
import gymnasium
import numpy as np
from gymnasium import spaces

return gymnasium, spaces, np
except ImportError as exc:
raise ImportError(_GYMNASIUM_INSTALL_HINT) from exc


class GymnasiumAdapter:
"""
Expose a CloudAI :class:`BaseGym` as a standard ``gymnasium.Env``-shaped object.

The adapter:

* builds a ``gymnasium.spaces.Dict`` of ``Discrete`` action spaces over the
*tunable* parameters (those with more than one candidate value), and
injects the *fixed* parameters (single candidate) automatically on every
step so agents never see them.
* converts observations to ``float32`` ``numpy`` arrays sized by
``env.define_observation_space()``.
* returns the gymnasium 5-tuple ``(obs, reward, terminated, truncated, info)``
from :meth:`step` and :meth:`step_raw`.
* keeps ``env.test_run.step`` in sync (1-based) so artifact paths produced by
``CloudAIGymEnv`` match those produced by ``handle_dse_job`` (i.e.
``<scenario>/<test>/<iteration>/<step>/`` for every evaluation), which is
required when a custom training loop (e.g. RLlib) front-ends the env.

``gymnasium`` and ``numpy`` are optional dependencies; importing this module
is cheap, but instantiating the adapter without them raises ``ImportError``.
"""

metadata: ClassVar[dict[str, Any]] = {"render_modes": ["human"]}

def __init__(self, env: BaseGym) -> None:
_, spaces, np = _import_gymnasium()

self._np = np
self._env = env
self._step_count = 0

raw_action_space = env.define_action_space()
self._tunable_params: dict[str, list] = {k: v for k, v in raw_action_space.items() if len(v) > 1}
self._fixed_params: dict[str, Any] = {k: v[0] for k, v in raw_action_space.items() if len(v) == 1}

self.action_space = spaces.Dict(
{name: spaces.Discrete(len(values)) for name, values in self._tunable_params.items()}
)

obs_shape = (len(env.define_observation_space()),)
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=obs_shape, dtype=np.float32)

@property
def unwrapped(self) -> BaseGym:
return self._env

def decode_action(self, action: dict[str, int]) -> dict[str, Any]:
"""
Map discrete action indices back to the original parameter values.

Raises:
ValueError: if ``action`` is missing tunable params, contains unknown keys,
or carries an index outside the discrete range for any tunable param.
"""
self._assert_keys(action.keys(), set(self._tunable_params), "action")
decoded: dict[str, Any] = {}
for name, idx in action.items():
values = self._tunable_params[name]
if not 0 <= idx < len(values):
raise ValueError(f"Action index out of range for '{name}': {idx} (expected 0..{len(values) - 1})")
decoded[name] = values[idx]
return decoded

def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[Any, dict[str, Any]]:
self._step_count = 0
obs, info = self._env.reset(seed=seed, options=options)
return self._as_obs_array(obs), info

def step(self, action: dict[str, int]) -> tuple[Any, float, bool, bool, dict[str, Any]]:
params = {**self._fixed_params, **self.decode_action(action)}
return self._step_with_params(params)

def step_raw(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]:
"""
Step the env with an already-decoded parameter dict; bypasses index decoding.

Raises:
ValueError: if ``params`` does not cover exactly the tunable + fixed param keys.
"""
self._assert_keys(params.keys(), set(self._tunable_params) | set(self._fixed_params), "raw params")
return self._step_with_params(params)

def render(self) -> None:
self._env.render()

@staticmethod
def _assert_keys(received: Any, expected: set[str], ctx: str) -> None:
received_set = set(received)
if received_set == expected:
return
missing = sorted(expected - received_set)
extra = sorted(received_set - expected)
raise ValueError(f"{ctx} keys mismatch; missing={missing}, extra={extra}")

def _step_with_params(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]:
self._sync_underlying_step_counter()
obs, reward, done, info = self._env.step(params)
self._step_count += 1
return self._as_obs_array(obs), float(reward), bool(done), False, info

def _sync_underlying_step_counter(self) -> None:
"""
Mirror ``handle_dse_job``'s 1-based ``test_run.step`` so artifact paths match.

The first step is written under ``…/<iteration>/1/``, matching how
``handle_dse_job`` numbers steps; this keeps reports and trajectory
analysis consistent regardless of whether the env is driven by the
DSE loop or by an external training loop wrapping the adapter.
"""
test_run = getattr(self._env, "test_run", None)
if test_run is not None:
test_run.step = self._step_count + 1

def _as_obs_array(self, obs: Any) -> Any:
return self._np.asarray(obs, dtype=self._np.float32)
Loading