Support nvidia dynamo#3868
Conversation
There was a problem hiding this comment.
These in-place update cases are allowed, but will result in a broken service:
- Changing top-level service properties, resulting in a redeployment of the router replica (
image,env, etc). - Changing
router.typefromsglangtodynamo.
| # ROUTER_NOT_PROVISIONED — router job exists but its internal_ip is not yet | ||
| # known. The condition is transient; the caller | ||
| # should defer this worker and retry on the next | ||
| # pipeline tick (subject to a wait timeout — see | ||
| # ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS in | ||
| # jobs_running.py). | ||
| # | ||
| # ROUTER_FAILED — router job has reached a terminal state | ||
| # (TERMINATING/TERMINATED/FAILED/ABORTED/DONE). | ||
| # The condition is permanent; the caller should | ||
| # stop deferring and terminate this worker with a | ||
| # clear reason — waiting longer cannot recover the | ||
| # run because the router will not come back with a | ||
| # fresh internal_ip. | ||
| ROUTER_NOT_PROVISIONED: Dict[str, str] = {} | ||
| ROUTER_FAILED: Dict[str, str] = {} |
There was a problem hiding this comment.
(nit) Using empty dicts as sentinel objects looks a bit error-prone — if the caller uses == instead of is, the code will appear to be correct and will pass type checks, but won't work as expected.
I'd suggest to define an enum with these two variants instead.
| def get_router_replica_num(run_spec: RunSpec) -> Optional[int]: | ||
| """Return the global replica_num assigned to the router replica group, or | ||
| None if the run has no router replica group. Used by _fetch_run_model in | ||
| pipeline_tasks/jobs_running.py to load the router replica's job alongside | ||
| the worker's own same-replica siblings, so get_router_env_for_job can see the | ||
| router's status / internal_ip. | ||
| """ | ||
| cfg = run_spec.configuration | ||
| if not isinstance(cfg, ServiceConfiguration): | ||
| return None | ||
| global_replica_num = 0 | ||
| for group in cfg.replica_groups: | ||
| if group.router is not None: | ||
| return global_replica_num | ||
| assert group.count.min is not None | ||
| global_replica_num += group.count.min | ||
| return None |
There was a problem hiding this comment.
I believe this function will start to return incorrect values after an in-place place. For example, if an in-place update changes group.count.min of a worker replica group that precedes the router replica group.
Some alternative ways to determine which jobs to load in _fetch_run_model:
- If the run spec includes a dynamo router, load all active jobs (but not terminated, because there can be too many). And if the router job isn't loaded by that query, then I think we can assume that it is terminated (or that the group is scaled to zero, once that is allowed).
- Duplicate the replica group name as a
JobModelfield and use it as a filter in the database query.
| ) | ||
| return None | ||
| if router_env: | ||
| context.job.job_spec.env.update(router_env) |
There was a problem hiding this comment.
(nit) Mutating job_spec.env looks a bit hacky. Now job_spec.env contains just the user-provided environment in some cases, and one extra system variable in other cases, which makes the code a bit less straightforward.
I would suggest to return router_env from this function as a _StartupContext field, then pass it through the function calls until it reaches RunnerClient.submit_job, where it can be merged with other env variables.
(RunnerClient.submit_job uses the same hack when merging variables, but it creates a copy of the job spec, so the spec mutation doesn't leak to the caller)
| # For runs without a router group (services without one, plus | ||
| # all tasks and dev-environments), the helper returns None and we | ||
| # fall through to the original single-replica behavior. | ||
| spec_res = await session.execute(select(RunModel.run_spec).where(RunModel.id == run_id)) |
There was a problem hiding this comment.
(nit) It would be great to avoid this extra query. For example, by loading the run spec as a join in _refetch_locked_job_model
| if router_job is None: | ||
| # No router job yet — the run was just submitted and jobs haven't | ||
| # been materialized. Treat as "not provisioned" so the caller defers. |
There was a problem hiding this comment.
(nit) Is this possible? I think at run submission we create the jobs in the same database transaction as the run, see submit_run()
There was a problem hiding this comment.
Thanks for pointing out. The scenario cannot happen. I have updated the comment and return status as below.
if router_job is None:
# The router's latest submission is in a terminal state and was
# filtered out by _fetch_run_model's not-terminated predicate.
return RouterEnvStatus.FAILED
There was a problem hiding this comment.
A few unit tests for these changes could be useful
Service Configuration example