Skip to content

Support nvidia dynamo#3868

Open
Bihan wants to merge 3 commits into
dstackai:masterfrom
Bihan:support_nvidia_dynamo
Open

Support nvidia dynamo#3868
Bihan wants to merge 3 commits into
dstackai:masterfrom
Bihan:support_nvidia_dynamo

Conversation

@Bihan
Copy link
Copy Markdown
Collaborator

@Bihan Bihan commented May 8, 2026

Service Configuration example

type: service
name: dynamo-pd


env:
  - HF_TOKEN
  - MODEL_ID=meta-llama/Llama-3.2-3B-Instruct

replicas: 
  - name: router
    count: 1
    docker: true
    router:
      type: dynamo
    commands:
      # DIND ships docker but not pip — set up a venv for ai-dynamo.
      - apt-get update
      - apt-get install -y python3-dev python3-venv
      - python3 -m venv ~/dyn-venv
      - source ~/dyn-venv/bin/activate
      - pip install -U pip
      - pip install --pre "ai-dynamo[sglang]"
      # Pull Dynamo and start the supporting compose stack (NATS, etcd, ...).
      - git clone https://github.com/ai-dynamo/dynamo.git
      - docker compose -f dynamo/deploy/docker-compose.yml up -d
      # Run the Dynamo frontend (was the commented dev command).
      - |
        python3 -m dynamo.frontend \
          --http-host 0.0.0.0 --http-port 8000 \
          --discovery-backend etcd --router-mode kv \
          --kv-cache-block-size 64
    resources:
      cpu: 4

  # ── prefill worker ─────────────────────────────────────────────────────
  # dstackai/base + Python 3.12 + NVCC (needed for CUDA kernels in sglang).
  - name: prefill
    count: 1..2
    scaling:
      metric: rps
      target: 4 
    python: "3.12"
    nvcc: true
    commands:
      # dstack injected DSTACK_ROUTER_INTERNAL_IP after the router replica
      # was provisioned. Compose the etcd/NATS endpoints from it.
      - export ETCD_ENDPOINTS="http://$DSTACK_ROUTER_INTERNAL_IP:2379"
      - export NATS_SERVER="nats://$DSTACK_ROUTER_INTERNAL_IP:4222"
      - export DYN_SYSTEM_HOST="0.0.0.0"
      - export DYN_SYSTEM_PORT="8000"
      # Wait until the router's etcd and NATS ports are actually accepting
      # connections — having the IP isn't the same as having the services up.
      - |
        until (echo > /dev/tcp/$DSTACK_ROUTER_INTERNAL_IP/2379) 2>/dev/null \
           && (echo > /dev/tcp/$DSTACK_ROUTER_INTERNAL_IP/4222) 2>/dev/null; do
          echo "waiting for etcd/NATS on $DSTACK_ROUTER_INTERNAL_IP..."; sleep 3
        done
      - pip install --pre "ai-dynamo[sglang]"
      - |
        python3 -m dynamo.sglang \
          --model-path $MODEL_ID --served-model-name $MODEL_ID \
          --discovery-backend etcd --host 0.0.0.0 \
          --page-size 64 \
          --disaggregation-mode prefill --disaggregation-transfer-backend nixl
    resources:
      gpu: L4

  # ── decode worker ──────────────────────────────────────────────────────
  - name: decode
    count: 1
    python: "3.12"
    nvcc: true
    commands:
      - export ETCD_ENDPOINTS="http://$DSTACK_ROUTER_INTERNAL_IP:2379"
      - export NATS_SERVER="nats://$DSTACK_ROUTER_INTERNAL_IP:4222"
      - export DYN_SYSTEM_HOST="0.0.0.0"
      - export DYN_SYSTEM_PORT="8000"
      - |
        until (echo > /dev/tcp/$DSTACK_ROUTER_INTERNAL_IP/2379) 2>/dev/null \
           && (echo > /dev/tcp/$DSTACK_ROUTER_INTERNAL_IP/4222) 2>/dev/null; do
          echo "waiting for etcd/NATS on $DSTACK_ROUTER_INTERNAL_IP..."; sleep 3
        done
      - pip install --pre "ai-dynamo[sglang]"
      - |
        python3 -m dynamo.sglang \
          --model-path $MODEL_ID --served-model-name $MODEL_ID \
          --discovery-backend etcd --host 0.0.0.0 \
          --page-size 64 \
          --disaggregation-mode decode --disaggregation-transfer-backend nixl
    resources:
      gpu: L4

port: 8000
model: meta-llama/Llama-3.2-3B-Instruct

probes:
 - type: http
   url: /health
   interval: 15s

@Bihan Bihan requested review from jvstme and r4victor and removed request for r4victor May 8, 2026 17:53
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These in-place update cases are allowed, but will result in a broken service:

  • Changing top-level service properties, resulting in a redeployment of the router replica (image, env, etc).
  • Changing router.type from sglang to dynamo.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +14 to +29
# ROUTER_NOT_PROVISIONED — router job exists but its internal_ip is not yet
# known. The condition is transient; the caller
# should defer this worker and retry on the next
# pipeline tick (subject to a wait timeout — see
# ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS in
# jobs_running.py).
#
# ROUTER_FAILED — router job has reached a terminal state
# (TERMINATING/TERMINATED/FAILED/ABORTED/DONE).
# The condition is permanent; the caller should
# stop deferring and terminate this worker with a
# clear reason — waiting longer cannot recover the
# run because the router will not come back with a
# fresh internal_ip.
ROUTER_NOT_PROVISIONED: Dict[str, str] = {}
ROUTER_FAILED: Dict[str, str] = {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Using empty dicts as sentinel objects looks a bit error-prone — if the caller uses == instead of is, the code will appear to be correct and will pass type checks, but won't work as expected.

I'd suggest to define an enum with these two variants instead.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +163 to +179
def get_router_replica_num(run_spec: RunSpec) -> Optional[int]:
"""Return the global replica_num assigned to the router replica group, or
None if the run has no router replica group. Used by _fetch_run_model in
pipeline_tasks/jobs_running.py to load the router replica's job alongside
the worker's own same-replica siblings, so get_router_env_for_job can see the
router's status / internal_ip.
"""
cfg = run_spec.configuration
if not isinstance(cfg, ServiceConfiguration):
return None
global_replica_num = 0
for group in cfg.replica_groups:
if group.router is not None:
return global_replica_num
assert group.count.min is not None
global_replica_num += group.count.min
return None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this function will start to return incorrect values after an in-place place. For example, if an in-place update changes group.count.min of a worker replica group that precedes the router replica group.

Some alternative ways to determine which jobs to load in _fetch_run_model:

  • If the run spec includes a dynamo router, load all active jobs (but not terminated, because there can be too many). And if the router job isn't loaded by that query, then I think we can assume that it is terminated (or that the group is scaled to zero, once that is allowed).
  • Duplicate the replica group name as a JobModel field and use it as a filter in the database query.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)
return None
if router_env:
context.job.job_spec.env.update(router_env)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Mutating job_spec.env looks a bit hacky. Now job_spec.env contains just the user-provided environment in some cases, and one extra system variable in other cases, which makes the code a bit less straightforward.

I would suggest to return router_env from this function as a _StartupContext field, then pass it through the function calls until it reaches RunnerClient.submit_job, where it can be merged with other env variables.

(RunnerClient.submit_job uses the same hack when merging variables, but it creates a copy of the job spec, so the spec mutation doesn't leak to the caller)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# For runs without a router group (services without one, plus
# all tasks and dev-environments), the helper returns None and we
# fall through to the original single-replica behavior.
spec_res = await session.execute(select(RunModel.run_spec).where(RunModel.id == run_id))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) It would be great to avoid this extra query. For example, by loading the run spec as a join in _refetch_locked_job_model

Comment on lines +220 to +222
if router_job is None:
# No router job yet — the run was just submitted and jobs haven't
# been materialized. Treat as "not provisioned" so the caller defers.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Is this possible? I think at run submission we create the jobs in the same database transaction as the run, see submit_run()

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. The scenario cannot happen. I have updated the comment and return status as below.

if router_job is None:
        # The router's latest submission is in a terminal state and was
        # filtered out by _fetch_run_model's not-terminated predicate.
        return RouterEnvStatus.FAILED

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few unit tests for these changes could be useful

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants