diff --git a/conf/experimental/ai_dynamo/test/sglang.toml b/conf/experimental/ai_dynamo/test/sglang.toml index 67fc999f0..34bc9cbff 100644 --- a/conf/experimental/ai_dynamo/test/sglang.toml +++ b/conf/experimental/ai_dynamo/test/sglang.toml @@ -18,6 +18,7 @@ name = "sglang" description = "sglang backend" test_template_name = "AIDynamo" extra_container_mounts = ["/run/udev:/run/udev"] +dse_excluded_args = ["cmd_args.aiperf_phases"] [cmd_args] docker_image_url = "nvcr.io/nvidia/ai-dynamo/sglang-runtime:1.1.1" @@ -88,6 +89,20 @@ workloads = "aiperf.sh" request-count = 50 synthetic-input-tokens-mean = 300 + [[cmd_args.aiperf_phases]] + name = "round_1" + + [cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + + [[cmd_args.aiperf_phases]] + name = "round_2" + + [cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + [cmd_args.aiperf_accuracy] entrypoint = "aiperf profile" setup-cmd = "python -m pip install --break-system-packages --ignore-installed blinker==1.9.0 && python -m pip install --break-system-packages --upgrade aiperf==0.8.0" diff --git a/conf/experimental/ai_dynamo/test/vllm.toml b/conf/experimental/ai_dynamo/test/vllm.toml index 8a5f3b939..581ecf3e7 100644 --- a/conf/experimental/ai_dynamo/test/vllm.toml +++ b/conf/experimental/ai_dynamo/test/vllm.toml @@ -18,6 +18,7 @@ name = "vLLM" description = "vLLM backend" test_template_name = "AIDynamo" extra_container_mounts = ["/run/udev:/run/udev"] +dse_excluded_args = ["cmd_args.aiperf_phases"] [cmd_args] docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.1.1" @@ -38,6 +39,7 @@ workloads = "aiperf.sh" tensor-parallel-size = 8 pipeline-parallel-size = 1 data-parallel-size = 1 + kv-transfer-config = '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' [cmd_args.dynamo.decode_worker] num-nodes = 1 @@ -50,6 +52,7 @@ workloads = "aiperf.sh" tensor-parallel-size = 8 pipeline-parallel-size = 1 data-parallel-size = 1 + kv-transfer-config = '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' [cmd_args.lmcache_controller] cmd = "lmcache_controller --host 0.0.0.0 --port 9000 --monitor-port 9001" @@ -78,6 +81,20 @@ workloads = "aiperf.sh" request-count = 50 synthetic-input-tokens-mean = 300 + [[cmd_args.aiperf_phases]] + name = "round_1" + + [cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + + [[cmd_args.aiperf_phases]] + name = "round_2" + + [cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + [cmd_args.aiperf_accuracy] entrypoint = "aiperf profile" setup-cmd = "python -m pip install --break-system-packages --upgrade aiperf==0.8.0" diff --git a/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml b/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml index 564311240..f975e784e 100644 --- a/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml +++ b/conf/experimental/ai_dynamo/test_scenario/vllm_lmcache.toml @@ -24,7 +24,7 @@ description = "Self-contained AIDynamo scenario wiring vLLM disaggregated infere test_template_name = "AIDynamo" time_limit = "00:10:00" extra_container_mounts = ["/run/udev:/run/udev"] -dse_excluded_args = ["cmd_args.lmcache.lmcache_worker_ports"] +dse_excluded_args = ["cmd_args.lmcache.lmcache_worker_ports", "cmd_args.aiperf_phases"] [Tests.cmd_args] docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.1.1" @@ -90,6 +90,20 @@ dse_excluded_args = ["cmd_args.lmcache.lmcache_worker_ports"] request-count = 50 synthetic-input-tokens-mean = 300 + [[Tests.cmd_args.aiperf_phases]] + name = "round_1" + + [Tests.cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + + [[Tests.cmd_args.aiperf_phases]] + name = "round_2" + + [Tests.cmd_args.aiperf_phases.args] + concurrency = 4 + request-count = 50 + [Tests.cmd_args.aiperf_accuracy] entrypoint = "aiperf profile" setup-cmd = "python -m pip install --break-system-packages --upgrade aiperf==0.8.0" diff --git a/src/cloudai/workloads/ai_dynamo/__init__.py b/src/cloudai/workloads/ai_dynamo/__init__.py index 5e430068d..57e2eb99e 100644 --- a/src/cloudai/workloads/ai_dynamo/__init__.py +++ b/src/cloudai/workloads/ai_dynamo/__init__.py @@ -15,6 +15,7 @@ # limitations under the License. from .ai_dynamo import ( + AIPERF_COMMANDS_FILE_NAME, LMCACHE_CONFIG_BACKUP_FILE_NAME, LMCACHE_CONFIG_FILE_NAME, AIDynamoArgs, @@ -22,6 +23,7 @@ AIDynamoTestDefinition, AIPerf, AIPerfAccuracy, + AIPerfPhase, GenAIPerf, LMCacheController, WorkerBaseArgs, @@ -32,6 +34,7 @@ from .slurm_command_gen_strategy import AIDynamoSlurmCommandGenStrategy __all__ = [ + "AIPERF_COMMANDS_FILE_NAME", "LMCACHE_CONFIG_BACKUP_FILE_NAME", "LMCACHE_CONFIG_FILE_NAME", "AIDynamoArgs", @@ -42,6 +45,7 @@ "AIDynamoTestDefinition", "AIPerf", "AIPerfAccuracy", + "AIPerfPhase", "GenAIPerf", "LMCacheController", "WorkerBaseArgs", diff --git a/src/cloudai/workloads/ai_dynamo/ai_dynamo.py b/src/cloudai/workloads/ai_dynamo/ai_dynamo.py index 7f8da4165..5c45a149b 100644 --- a/src/cloudai/workloads/ai_dynamo/ai_dynamo.py +++ b/src/cloudai/workloads/ai_dynamo/ai_dynamo.py @@ -42,6 +42,7 @@ from cloudai.systems.slurm import SlurmSystem AIPERF_ARTIFACTS_DIR = "aiperf_artifacts" +AIPERF_COMMANDS_FILE_NAME = "aiperf_commands.json" AIPERF_ACCURACY_ARTIFACTS_DIR = "aiperf_accuracy_artifacts" AIPERF_ACCURACY_RESULTS_CSV = "accuracy_results.csv" LMCACHE_CONFIG_FILE_NAME = "lmcache-config.yaml" @@ -254,6 +255,7 @@ class AIPerf(Workload): name: str = "aiperf" cmd: str = "aiperf profile" script: File = File(Path(__file__).parent.parent / "ai_dynamo/aiperf.sh") + runtime: File = Field(default=File(Path(__file__).parent.parent / "ai_dynamo/runtime/aiperf.py"), exclude=True) setup_cmd: str | None = Field( default=None, serialization_alias="setup-cmd", @@ -267,7 +269,13 @@ class AIPerf(Workload): @property def installables(self) -> list[Installable]: - return [self.script] + return [self.script, self.runtime] + + +class AIPerfPhase(AIPerf): + """Named AIPerf phase that overrides the base AIPerf configuration.""" + + name: str = Field(min_length=1, pattern=r"^[A-Za-z0-9_.-]+$") class AIPerfAccuracy(BaseModel): @@ -324,6 +332,7 @@ class AIDynamoCmdArgs(CmdArgs): lmcache_controller: LMCacheController | None = None genai_perf: GenAIPerf = Field(default_factory=GenAIPerf) aiperf: AIPerf = Field(default_factory=AIPerf) + aiperf_phases: list[AIPerfPhase] | None = None aiperf_accuracy: AIPerfAccuracy | None = None workloads: str = "genai_perf.sh" @@ -341,6 +350,23 @@ def validate_workloads(cls, v: str) -> str: def workloads_list(self) -> list[str]: return [w.strip() for w in self.workloads.split(",")] + @model_validator(mode="after") + def validate_aiperf_phases(self) -> "AIDynamoCmdArgs": + """Validate AIPerf phases.""" + if not self.aiperf_phases: + return self + + seen = set() + duplicates = set() + for phase in self.aiperf_phases: + if phase.name in seen: + duplicates.add(phase.name) + seen.add(phase.name) + if duplicates: + raise ValueError(f"AIPerf phase names must be unique. Duplicates: {sorted(duplicates)}") + + return self + @property def installables(self) -> list[Installable]: return [ diff --git a/src/cloudai/workloads/ai_dynamo/aiperf.sh b/src/cloudai/workloads/ai_dynamo/aiperf.sh index 15cee3a58..476ee3062 100644 --- a/src/cloudai/workloads/ai_dynamo/aiperf.sh +++ b/src/cloudai/workloads/ai_dynamo/aiperf.sh @@ -2,182 +2,8 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES # Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# aiperf.sh — aiperf profile wrapper for ai_dynamo workloads. -# -# Called from ai_dynamo.sh's launch_workload() with: -# bash aiperf.sh --result-dir --model --url --port -# [--cmd ] [--report-name ] [--artifact-dir-name ] [--extra-args ] -# -- ... -# -# Context flags (before --) that are recognised and used: -# --result-dir Directory where artifacts and the final report are written. -# --model HuggingFace model identifier (e.g. Qwen/Qwen3-0.6B). -# --url Base URL of the dynamo.frontend (e.g. http://node01). -# --port HTTP port the dynamo.frontend is listening on. -# --report-name Output CSV name (default: aiperf_report.csv). -# --artifact-dir-name Artifact directory name under --result-dir (default: aiperf_artifacts). -# --cmd Full launch command including subcommand (default: "aiperf profile"). -# --setup-cmd Optional shell command run before launching aiperf. -# --extra-args Raw string appended verbatim after all other flags. -# -# All unrecognised flags (--install-dir, --gpus-per-node, etc.) are silently -# consumed so this script is forward-compatible with launch_workload additions. -# -# Everything after -- is passed directly to the aiperf profile invocation. set -Eeuo pipefail -result_dir="" -model="" -url="http://localhost" -port=8000 -report_name="aiperf_report.csv" -artifact_dir_name="aiperf_artifacts" -cmd="aiperf profile" -setup_cmd="" -declare -a extra_args=() -declare -a profile_args=() - -log() { - echo "[$(date '+%F %T') $(hostname)]: $*" -} - -_parse_aiperf_args() { - while [[ $# -ge 2 ]]; do - case "$1" in - --*) profile_args+=("$1" "$2"); shift 2 ;; - *) shift ;; - esac - done - # Capture a trailing lone boolean flag if present. - # Use if/fi — not [[ ]] && — so set -e does not trigger on a false condition. - if [[ $# -eq 1 && "$1" == --* ]]; then - profile_args+=("$1") - fi -} - -process_args() { - while [[ $# -gt 0 ]]; do - case "$1" in - --result-dir) result_dir="$2"; shift 2 ;; - --model) model="$2"; shift 2 ;; - --url) url="$2"; shift 2 ;; - --port) port="$2"; shift 2 ;; - --report-name) report_name="$2"; shift 2 ;; - --artifact-dir-name) artifact_dir_name="$2"; shift 2 ;; - --cmd) cmd="$2"; shift 2 ;; - --setup-cmd) setup_cmd="$2"; shift 2 ;; - --extra-args) read -ra extra_args <<< "$2"; shift 2 ;; - --) shift; _parse_aiperf_args "$@"; break ;; - --*) if [[ -n "${2:-}" && "${2}" != -* ]]; then shift 2; else shift 1; fi ;; # consume unknown flag; shift 2 only if next arg is a value - *) shift ;; - esac - done - - log "Parsed args: - result_dir: $result_dir - model: $model - url: $url - port: $port - report_name: $report_name - artifact_dir: $artifact_dir_name - cmd: $cmd - setup_cmd: ${setup_cmd:-} - extra_args: ${extra_args[*]:-} - profile_args: ${profile_args[*]:-}" -} - -run_setup_cmd() { - if [[ -z "$setup_cmd" ]]; then - return - fi - - log "Running AIPerf setup command: $setup_cmd" - bash -lc "$setup_cmd" - log "AIPerf setup command complete" -} - -process_results() { - local artifact_dir="$result_dir/$artifact_dir_name" - local csv_path="" - - if [[ -f "$artifact_dir/profile_export_aiperf.csv" ]]; then - csv_path="$artifact_dir/profile_export_aiperf.csv" - else - csv_path=$(find "$artifact_dir" -name "*aiperf*.csv" -print -quit 2>/dev/null || true) - fi - - if [[ -n "$csv_path" ]]; then - cp "$csv_path" "$result_dir/$report_name" - log "aiperf report saved to $result_dir/$report_name" - else - log "ERROR: no CSV found in $artifact_dir — aiperf may not have completed" - exit 1 - fi - -} - -run_aiperf() { - local full_url="$1" - local artifact_dir="$2" - local -a run_cmd=() - read -ra run_cmd <<< "$cmd" - local -a launch_cmd=( - "${run_cmd[@]}" - --model "$model" - --url "$full_url" - --endpoint-type chat - --streaming - --artifact-dir "$artifact_dir" - --no-server-metrics - ) - - log "Launching aiperf: ${run_cmd[*]} --model $model --url $full_url" - - if [[ "${#profile_args[@]}" -gt 0 ]]; then - launch_cmd+=("${profile_args[@]}") - fi - if [[ "${#extra_args[@]}" -gt 0 ]]; then - launch_cmd+=("${extra_args[@]}") - fi - - "${launch_cmd[@]}" - - log "aiperf run complete" -} - -main() { - process_args "$@" - - if [[ -z "$result_dir" ]]; then - log "ERROR: --result-dir is required"; exit 1 - fi - if [[ -z "$model" ]]; then - log "ERROR: --model is required"; exit 1 - fi - - run_setup_cmd - - local full_url="${url}:${port}" - local artifact_dir="$result_dir/$artifact_dir_name" - rm -rf "$artifact_dir" - - run_aiperf "$full_url" "$artifact_dir" - process_results -} - -main "$@" -exit 0 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +exec python3 "${SCRIPT_DIR}/aiperf.py" "$@" diff --git a/src/cloudai/workloads/ai_dynamo/runtime/aiperf.py b/src/cloudai/workloads/ai_dynamo/runtime/aiperf.py new file mode 100644 index 000000000..b5476d571 --- /dev/null +++ b/src/cloudai/workloads/ai_dynamo/runtime/aiperf.py @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Execute generated AIPerf runtime entries.""" + +from __future__ import annotations + +import argparse +import json +import shlex +import shutil +import subprocess +import sys +from pathlib import Path +from typing import Any + + +def log(message: str) -> None: + print(message, flush=True) + + +def substitute_frontend_url(values: list[str], frontend_url: str) -> list[str]: + return [value.replace("{frontend_url}", frontend_url) for value in values] + + +def run_entry(entry: dict[str, Any], frontend_url: str) -> None: + argv = substitute_frontend_url([*entry["cmd"], *entry.get("cli", [])], frontend_url) + output_folder = entry.get("output_folder") + if output_folder: + shutil.rmtree(output_folder, ignore_errors=True) + + log(f"Running {entry['name']}: {shlex.join(argv)}") + log_file = entry.get("log_file") + if log_file: + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + with log_path.open("w", encoding="utf-8") as fp: + subprocess.run(argv, stdout=fp, stderr=subprocess.STDOUT, check=True) + else: + subprocess.run(argv, check=True) + + report_source = entry.get("report_source") + report_file = entry.get("report_file") + if report_source and report_file: + report_path = Path(report_file) + report_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(report_source, report_path) + log(f"AIPerf report saved to {report_path}") + + final_report_file = entry.get("final_report_file") + if final_report_file and report_file: + shutil.copy2(report_file, final_report_file) + log(f"Final AIPerf report saved to {final_report_file}") + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--commands-file", required=True) + parser.add_argument("--url", required=True) + args, _ = parser.parse_known_args(argv) + return args + + +def main(argv: list[str]) -> int: + try: + args = parse_args(argv) + with Path(args.commands_file).open(encoding="utf-8") as fp: + entries = json.load(fp) + + for entry in entries: + run_entry(entry, args.url) + except Exception as exc: + log(f"ERROR: {exc}") + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py b/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py index 861a4c469..4cbb33823 100644 --- a/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py @@ -14,10 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import shlex from pathlib import Path -from typing import List, cast +from typing import Any, List, cast import yaml from pydantic import BaseModel, TypeAdapter, ValidationError @@ -25,7 +26,15 @@ from cloudai.core import File, GitRepo from cloudai.systems.slurm import SlurmCommandGenStrategy -from .ai_dynamo import LMCACHE_CONFIG_BACKUP_FILE_NAME, LMCACHE_CONFIG_FILE_NAME, AIDynamoTestDefinition +from .ai_dynamo import ( + AIPERF_ARTIFACTS_DIR, + AIPERF_COMMANDS_FILE_NAME, + LMCACHE_CONFIG_BACKUP_FILE_NAME, + LMCACHE_CONFIG_FILE_NAME, + AIDynamoTestDefinition, + AIPerf, + AIPerfPhase, +) class AIDynamoSlurmCommandGenStrategy(SlurmCommandGenStrategy): @@ -109,8 +118,135 @@ def _prepare_lmcache_config(self): (self.test_run.output_path / LMCACHE_CONFIG_FILE_NAME).write_text(config) (self.test_run.output_path / LMCACHE_CONFIG_BACKUP_FILE_NAME).write_text(config) + def _aiperf_config_dict(self, aiperf: AIPerf, *, exclude_unset: bool = False) -> dict[str, Any]: + return aiperf.model_dump( + by_alias=True, + exclude={"args", "name", "repo", "script", "runtime"}, + exclude_none=True, + exclude_unset=exclude_unset, + ) + + def _aiperf_args_dict(self, aiperf: AIPerf, *, exclude_unset: bool = False) -> dict[str, Any]: + return aiperf.args.model_dump(by_alias=True, exclude_none=True, exclude_unset=exclude_unset) + + def _aiperf_args_argv(self, args: dict[str, Any]) -> list[str]: + result = [] + for key, value in args.items(): + result.append(f"--{key}") + if value is not None: + result.append(str(value)) + return result + + def _runtime_result_path(self, path: str) -> str: + if Path(path).is_absolute(): + return path + return f"{self.CONTAINER_MOUNT_OUTPUT}/{path}" + + def _split_extra_args(self, value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(item) for item in value] + return shlex.split(str(value)) + + def _aiperf_phase_manifest_entry(self, base: AIPerf, phase: AIPerfPhase, *, single_phase: bool) -> dict[str, Any]: + base_config = self._aiperf_config_dict(base) + phase_config = self._aiperf_config_dict(phase, exclude_unset=True) + config = {**base_config, **phase_config} + + base_args = self._aiperf_args_dict(base) + phase_args = self._aiperf_args_dict(phase, exclude_unset=True) + args = {**base_args, **phase_args} + + if "artifact-dir-name" not in phase_config: + base_artifact_dir = base_config.get("artifact-dir-name", AIPERF_ARTIFACTS_DIR) + config["artifact-dir-name"] = base_artifact_dir if single_phase else f"{base_artifact_dir}/{phase.name}" + if "report-name" not in phase_config: + base_report_name = base_config.get("report-name", "aiperf_report.csv") + config["report-name"] = base_report_name if single_phase else f"aiperf_{phase.name}_report.csv" + + return { + "name": phase.name, + "config": config, + "profile_args": self._aiperf_args_argv(args), + } + + def _aiperf_entries(self) -> list[dict[str, Any]]: + phases = self.td.cmd_args.aiperf_phases or [AIPerfPhase.model_validate({"name": "aiperf"})] + return [ + self._aiperf_phase_manifest_entry( + self.td.cmd_args.aiperf, + phase, + single_phase=len(phases) == 1, + ) + for phase in phases + ] + + def _aiperf_run_entry(self, entry: dict[str, Any], *, write_phase_log: bool, is_final: bool) -> dict[str, Any]: + config = entry["config"] + artifact_dir_name = config["artifact-dir-name"] + artifact_dir = self._runtime_result_path(artifact_dir_name) + runtime_entry = { + "name": entry["name"], + "cmd": shlex.split(config["cmd"]), + "cli": [ + "--model", + self.td.cmd_args.dynamo.model, + "--url", + f"{{frontend_url}}:{self.td.cmd_args.dynamo.port}", + "--endpoint-type", + "chat", + "--streaming", + "--artifact-dir", + artifact_dir, + "--no-server-metrics", + *entry["profile_args"], + *self._split_extra_args(config.get("extra-args")), + ], + "output_folder": artifact_dir, + "report_source": f"{artifact_dir}/profile_export_aiperf.csv", + "report_file": self._runtime_result_path(config["report-name"]), + } + if write_phase_log: + runtime_entry["log_file"] = self._runtime_result_path(f"aiperf_{entry['name']}.log") + if is_final: + runtime_entry["final_report_file"] = self._runtime_result_path("aiperf_report.csv") + return runtime_entry + + def _aiperf_setup_entry(self, setup_cmd: str) -> dict[str, Any]: + return { + "name": "aiperf_setup", + "cmd": ["bash", "-lc", setup_cmd], + "cli": [], + } + + def _prepare_aiperf_commands(self) -> str | None: + if "aiperf.sh" not in self.td.cmd_args.workloads_list: + return None + + self.test_run.output_path.mkdir(parents=True, exist_ok=True) + entries = self._aiperf_entries() + runtime_entries = [] + setup_cmd = entries[0]["config"].get("setup-cmd") + if setup_cmd: + runtime_entries.append(self._aiperf_setup_entry(setup_cmd)) + + write_phase_logs = len(entries) > 1 + for idx, entry in enumerate(entries): + runtime_entries.append( + self._aiperf_run_entry( + entry, + write_phase_log=write_phase_logs, + is_final=len(entries) > 1 and idx == len(entries) - 1, + ) + ) + + (self.test_run.output_path / AIPERF_COMMANDS_FILE_NAME).write_text(json.dumps(runtime_entries, indent=2)) + return f"{self.CONTAINER_MOUNT_OUTPUT}/{AIPERF_COMMANDS_FILE_NAME}" + def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]: self._prepare_lmcache_config() + aiperf_commands_file = self._prepare_aiperf_commands() if not td.repo.installed_path: raise ValueError("Dynamo repo is not installed") args = [ @@ -146,6 +282,8 @@ def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]: args.extend(self._get_nested_toml_args(td.cmd_args.genai_perf, "--genai_perf-")) args.extend(self._get_nested_toml_args(td.cmd_args.aiperf, "--aiperf-")) + if aiperf_commands_file: + args.append(f"--aiperf-commands-file {aiperf_commands_file}") if td.cmd_args.aiperf_accuracy is not None: args.extend(self._get_nested_toml_args(td.cmd_args.aiperf_accuracy, "--aiperf_accuracy-")) diff --git a/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py b/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py index 0e2f23061..a279297ce 100644 --- a/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py +++ b/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import shlex from pathlib import Path from typing import cast @@ -25,6 +26,7 @@ from cloudai.core import GitRepo from cloudai.systems.slurm import SlurmSystem from cloudai.workloads.ai_dynamo import ( + AIPERF_COMMANDS_FILE_NAME, LMCACHE_CONFIG_BACKUP_FILE_NAME, LMCACHE_CONFIG_FILE_NAME, AIDynamoArgs, @@ -33,6 +35,7 @@ AIDynamoTestDefinition, AIPerf, AIPerfAccuracy, + AIPerfPhase, GenAIPerf, LMCacheController, WorkerBaseArgs, @@ -218,6 +221,97 @@ def test_gen_script_args_contains_custom_aiperf_accuracy_args(strategy: AIDynamo assert f'--aiperf_accuracy-cli "{cli}"' in result +def test_gen_script_args_writes_resolved_aiperf_commands(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf = AIPerf.model_validate( + { + "setup-cmd": "python -m pip install --upgrade aiperf", + "args": { + "concurrency": 2, + "request-count": 50, + "synthetic-input-tokens-mean": 300, + "output-tokens-mean": 500, + }, + } + ) + td.cmd_args.aiperf_phases = [ + AIPerfPhase.model_validate({"name": "round_1", "args": {"concurrency": 1}}), + AIPerfPhase.model_validate({"name": "round_2", "args": {"request-count": 10}}), + ] + + result = strategy._gen_script_args(td) + + assert f"--aiperf-commands-file {strategy.CONTAINER_MOUNT_OUTPUT}/{AIPERF_COMMANDS_FILE_NAME}" in result + entries = json.loads((strategy.test_run.output_path / AIPERF_COMMANDS_FILE_NAME).read_text()) + assert entries[0] == { + "name": "aiperf_setup", + "cmd": ["bash", "-lc", "python -m pip install --upgrade aiperf"], + "cli": [], + } + assert entries[1]["name"] == "round_1" + assert entries[1]["cmd"] == ["aiperf", "profile"] + assert entries[1]["cli"][:9] == [ + "--model", + "model", + "--url", + "{frontend_url}:8000", + "--endpoint-type", + "chat", + "--streaming", + "--artifact-dir", + f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_artifacts/round_1", + ] + assert entries[1]["cli"][-8:] == [ + "--concurrency", + "1", + "--request-count", + "50", + "--synthetic-input-tokens-mean", + "300", + "--output-tokens-mean", + "500", + ] + assert entries[1]["log_file"] == f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_1.log" + assert entries[1]["report_file"] == f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_round_1_report.csv" + assert entries[2]["cli"][-8:] == [ + "--concurrency", + "2", + "--request-count", + "10", + "--synthetic-input-tokens-mean", + "300", + "--output-tokens-mean", + "500", + ] + assert entries[2]["final_report_file"] == f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_report.csv" + + +def test_single_aiperf_phase_keeps_legacy_artifact_defaults(strategy: AIDynamoSlurmCommandGenStrategy) -> None: + td = cast(AIDynamoTestDefinition, strategy.test_run.test) + td.cmd_args.workloads = "aiperf.sh" + td.cmd_args.aiperf_phases = [AIPerfPhase.model_validate({"name": "round_1", "args": {"request-count": 10}})] + + strategy._gen_script_args(td) + + entries = json.loads((strategy.test_run.output_path / AIPERF_COMMANDS_FILE_NAME).read_text()) + assert entries[0]["output_folder"] == f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_artifacts" + assert entries[0]["report_file"] == f"{strategy.CONTAINER_MOUNT_OUTPUT}/aiperf_report.csv" + assert "log_file" not in entries[0] + + +def test_aiperf_phase_names_must_be_unique(cmd_args: AIDynamoCmdArgs) -> None: + with pytest.raises(ValueError, match="AIPerf phase names must be unique"): + AIDynamoCmdArgs( + docker_image_url=cmd_args.docker_image_url, + dynamo=cmd_args.dynamo, + aiperf_phases=[ + AIPerfPhase.model_validate({"name": "round_1"}), + AIPerfPhase.model_validate({"name": "round_1"}), + ], + ) + + def test_gen_script_args_quotes_worker_json_args(strategy: AIDynamoSlurmCommandGenStrategy) -> None: td = cast(AIDynamoTestDefinition, strategy.test_run.test) config = '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' diff --git a/tests/workloads/ai_dynamo/test_runtime_aiperf.py b/tests/workloads/ai_dynamo/test_runtime_aiperf.py new file mode 100644 index 000000000..18045b2a2 --- /dev/null +++ b/tests/workloads/ai_dynamo/test_runtime_aiperf.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import json +import sys +from pathlib import Path + +from cloudai.workloads.ai_dynamo.runtime import aiperf + + +def _write_fake_aiperf(tmp_path: Path) -> Path: + script = tmp_path / "fake_aiperf.py" + script.write_text( + """ +import sys +from pathlib import Path + +artifact_dir = Path(sys.argv[sys.argv.index("--artifact-dir") + 1]) +url = sys.argv[sys.argv.index("--url") + 1] +artifact_dir.mkdir(parents=True, exist_ok=True) +(artifact_dir / "profile_export_aiperf.csv").write_text(f"url\\n{url}\\n", encoding="utf-8") +""".strip(), + encoding="utf-8", + ) + return script + + +def test_runtime_executes_entries_and_copies_final_report(tmp_path: Path) -> None: + fake_aiperf = _write_fake_aiperf(tmp_path) + commands_file = tmp_path / "aiperf_commands.json" + artifact_dir = tmp_path / "aiperf_artifacts" / "round_1" + report_file = tmp_path / "aiperf_round_1_report.csv" + final_report_file = tmp_path / "aiperf_report.csv" + commands_file.write_text( + json.dumps( + [ + { + "name": "round_1", + "cmd": [sys.executable, str(fake_aiperf)], + "cli": [ + "--url", + "{frontend_url}:8000", + "--artifact-dir", + str(artifact_dir), + ], + "output_folder": str(artifact_dir), + "log_file": str(tmp_path / "aiperf_round_1.log"), + "report_source": str(artifact_dir / "profile_export_aiperf.csv"), + "report_file": str(report_file), + "final_report_file": str(final_report_file), + } + ] + ), + encoding="utf-8", + ) + + result = aiperf.main(["--url", "http://frontend", "--commands-file", str(commands_file)]) + + assert result == 0 + assert report_file.read_text(encoding="utf-8") == "url\nhttp://frontend:8000\n" + assert final_report_file.read_text(encoding="utf-8") == "url\nhttp://frontend:8000\n" + assert (tmp_path / "aiperf_round_1.log").is_file()