From c6ebe6d8f74eac47d9d2a679a829bc75bb9c759d Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 2 Jun 2026 17:50:37 -0700 Subject: [PATCH 1/4] feat(agent): route event ingest to agent-proxy via env --- .../main/services/cloud-task/service.test.ts | 53 +++++++++++++++++++ .../src/main/services/cloud-task/service.ts | 29 ++++++++++ packages/agent/src/server/agent-server.ts | 1 + packages/agent/src/server/bin.ts | 4 ++ .../src/server/event-stream-sender.test.ts | 22 ++++++++ .../agent/src/server/event-stream-sender.ts | 10 +++- packages/agent/src/server/types.ts | 3 ++ 7 files changed, 120 insertions(+), 2 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index ab9000a167..c5089a0a71 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -604,6 +604,59 @@ describe("CloudTaskService", () => { ).toBe(true); }); + it("stops without reconnecting when the server emits stream-end on a non-terminal run", async () => { + vi.useFakeTimers(); + + // Run status stays non-terminal the whole time. Pre-durable-contract, a clean EOF on a + // non-terminal run reconnects (see the test above); the stream-end sentinel must override that. + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve( + createSseResponse( + 'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:01Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"hi"}}}\n\nevent: stream-end\ndata: {}\n\n', + ), + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + const hasWatcher = (): boolean => + ( + service as unknown as { watchers: Map } + ).watchers.has("task-1:run-1"); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + // Let the reconnect delay (2s base) elapse; with stream-end honored, none is scheduled. + await vi.advanceTimersByTimeAsync(10_000); + + expect(mockStreamFetch.mock.calls.length).toBe(1); + await waitFor(() => !hasWatcher()); + }); + it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => { vi.useFakeTimers(); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index db99fc923f..5c8987bf5e 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -29,6 +29,11 @@ const EVENT_BATCH_FLUSH_MS = 16; const EVENT_BATCH_MAX_SIZE = 50; const SESSION_LOG_PAGE_LIMIT = 5_000; +// Durable end-of-stream sentinel emitted by the server (and the agent-proxy) once a run's +// event stream is complete. It is the authoritative "no more events, ever" signal — the +// client stops on it without consulting run status (status-unaware durable-stream contract). +const STREAM_END_EVENT_NAME = "stream-end"; + interface SessionLogsPage { entries: StoredLogEntry[]; hasMore: boolean; @@ -109,6 +114,7 @@ interface WatcherState { failed: boolean; needsPostBootstrapReconnect: boolean; needsStopAfterBootstrap: boolean; + streamEnded: boolean; } function watcherKey(taskId: string, runId: string): string { @@ -426,6 +432,7 @@ export class CloudTaskService extends TypedEventEmitter { failed: false, needsPostBootstrapReconnect: false, needsStopAfterBootstrap: false, + streamEnded: false, }; this.watchers.set(key, watcher); @@ -585,6 +592,10 @@ export class CloudTaskService extends TypedEventEmitter { if (!this.applyTaskRunState(watcher, run)) return; if (isTerminalStatus(watcher.lastStatus)) return; + this.emitStatusUpdate(watcher); + } + + private emitStatusUpdate(watcher: WatcherState): void { this.emit(CloudTaskEvent.Update, { taskId: watcher.taskId, runId: watcher.runId, @@ -749,6 +760,14 @@ export class CloudTaskService extends TypedEventEmitter { throw new BackendStreamError(message); } + if (event.event === STREAM_END_EVENT_NAME) { + // The run's stream is durably complete. Mark it so completion stops instead + // of reconnecting, independent of run status. The connection will close + // naturally (clean EOF) right after this sentinel. + watcher.streamEnded = true; + return; + } + // A keepalive or real event proves the transport recovered, so clear the // transport reconnect budget. A keepalive stops here: it does NOT clear the // backend-error budget, since it doesn't prove the stream itself produced @@ -1108,6 +1127,16 @@ export class CloudTaskService extends TypedEventEmitter { ): Promise { const watcher = this.watchers.get(key); if (!watcher) return; + if (watcher.failed) return; + + // Durable end-of-stream: the server signalled this run's stream is complete, so + // stop without polling run status. This is the status-unaware durable-stream contract + // — absent this sentinel we keep reconnecting (below) to ride out transport churn. + if (watcher.streamEnded) { + this.emitStatusUpdate(watcher); + this.stopWatcher(key); + return; + } const { reconnectIfNonTerminal } = options; const run = await this.fetchTaskRun(watcher); diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 8f6551556d..6566e25a0d 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -303,6 +303,7 @@ export class AgentServer { if (config.eventIngestToken) { this.eventStreamSender = new TaskRunEventStreamSender({ apiUrl: config.apiUrl, + eventIngestBaseUrl: config.eventIngestBaseUrl, projectId: config.projectId, taskId: config.taskId, runId: config.runId, diff --git a/packages/agent/src/server/bin.ts b/packages/agent/src/server/bin.ts index c70368ae9a..e474a45a6c 100644 --- a/packages/agent/src/server/bin.ts +++ b/packages/agent/src/server/bin.ts @@ -33,6 +33,9 @@ const envSchema = z.object({ .enum(["low", "medium", "high", "xhigh", "max"]) .optional(), POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(), + // Routes the event-ingest POST to the standalone agent-proxy; other API calls keep using + // POSTHOG_API_URL. Falls back to POSTHOG_API_URL when unset. + POSTHOG_TASK_RUN_EVENT_INGEST_URL: z.url().optional(), POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z .string() .regex( @@ -158,6 +161,7 @@ program port: parseInt(options.port, 10), jwtPublicKey: env.JWT_PUBLIC_KEY, eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN, + eventIngestBaseUrl: env.POSTHOG_TASK_RUN_EVENT_INGEST_URL, eventIngestStreamWindowMs: env.POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS, repositoryPath: options.repositoryPath, diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index 60b97e9cae..11399536e5 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -187,6 +187,28 @@ describe("TaskRunEventStreamSender", () => { ]); }); + it("routes the ingest POST to eventIngestBaseUrl when set, keeping the run path", async () => { + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + const body = await readRequestBody(init); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + eventIngestBaseUrl: "http://agent-proxy:8003/", + }); + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await sender.stop(); + + expect(fetchMock).toHaveBeenCalled(); + const lastCall = fetchMock.mock.calls[fetchMock.mock.calls.length - 1]; + expect(lastCall[0]).toBe( + "http://agent-proxy:8003/api/projects/1/tasks/task-1/runs/run-1/event_stream/", + ); + }); + it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; let activeStreamClosed = false; diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 0620bd758e..134ecc1150 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -8,6 +8,9 @@ import { interface TaskRunEventStreamSenderConfig { apiUrl: string; + // Base URL for the event-ingest POST only. Lets the deployment route ingest to the + // standalone agent-proxy while the rest of the agent's API calls stay on apiUrl. + eventIngestBaseUrl?: string; projectId: number; taskId: string; runId: string; @@ -85,8 +88,11 @@ export class TaskRunEventStreamSender { private bufferRevision = 0; constructor(private readonly config: TaskRunEventStreamSenderConfig) { - const apiUrl = config.apiUrl.replace(/\/$/, ""); - this.ingestUrl = `${apiUrl}/api/projects/${config.projectId}/tasks/${encodeURIComponent( + const ingestBase = (config.eventIngestBaseUrl ?? config.apiUrl).replace( + /\/$/, + "", + ); + this.ingestUrl = `${ingestBase}/api/projects/${config.projectId}/tasks/${encodeURIComponent( config.taskId, )}/runs/${encodeURIComponent(config.runId)}/event_stream/`; this.maxBufferedEvents = diff --git a/packages/agent/src/server/types.ts b/packages/agent/src/server/types.ts index d11cb7748c..777a3f2ff2 100644 --- a/packages/agent/src/server/types.ts +++ b/packages/agent/src/server/types.ts @@ -16,6 +16,9 @@ export interface AgentServerConfig { projectId: number; jwtPublicKey: string; // RS256 public key for JWT verification eventIngestToken?: string; + // Optional base URL for the event-ingest POST only (routes ingest to the agent-proxy); + // all other agent API calls keep using apiUrl. Falls back to apiUrl when unset. + eventIngestBaseUrl?: string; eventIngestStreamWindowMs?: number; mode: AgentMode; taskId: string; From 141af35e97f1dfb3d38d505d2417f8fb2acf02ac Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 2 Jun 2026 17:50:37 -0700 Subject: [PATCH 2/4] feat(cloud-task): honor stream-end sentinel, drop status coupling --- apps/code/src/main/services/cloud-task/service.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index c5089a0a71..e6f3d6eb28 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -645,9 +645,9 @@ describe("CloudTaskService", () => { }); const hasWatcher = (): boolean => - ( - service as unknown as { watchers: Map } - ).watchers.has("task-1:run-1"); + (service as unknown as { watchers: Map }).watchers.has( + "task-1:run-1", + ); await waitFor(() => mockStreamFetch.mock.calls.length === 1); // Let the reconnect delay (2s base) elapse; with stream-end honored, none is scheduled. From 2347200e81bf643420570c602005ebaf3413d27e Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 2 Jun 2026 18:22:01 -0700 Subject: [PATCH 3/4] feat(cloud-task): read live stream via agent-proxy when resolved --- .../main/services/cloud-task/service.test.ts | 73 ++++++++++++++- .../src/main/services/cloud-task/service.ts | 88 ++++++++++++++++--- 2 files changed, 148 insertions(+), 13 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index e6f3d6eb28..cb112c3a7c 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -3,15 +3,21 @@ import { CloudTaskEvent } from "./schemas"; const mockNetFetch = vi.hoisted(() => vi.fn()); const mockStreamFetch = vi.hoisted(() => vi.fn()); +const mockStreamTokenFetch = vi.hoisted(() => vi.fn()); // The service now uses global fetch for BOTH authenticated API calls (JSON) // and SSE streaming. The two used to be distinct (net.fetch vs global fetch). -// To preserve the existing test fixtures, route by URL: /stream/ → stream mock, -// everything else → API mock. +// Route by URL: /stream_token/ → token mock (read-leg resolution), /stream/ → stream mock, +// everything else → API mock. The token mock has a Django-path default so existing fixtures +// (which never set it) are untouched. const fetchRouter = vi.hoisted(() => vi.fn((input: string | Request, init?: RequestInit) => { const url = typeof input === "string" ? input : input.url; - const impl = url.includes("/stream/") ? mockStreamFetch : mockNetFetch; + const impl = url.includes("/stream_token/") + ? mockStreamTokenFetch + : url.includes("/stream/") + ? mockStreamFetch + : mockNetFetch; return impl(input, init); }), ); @@ -97,6 +103,13 @@ describe("CloudTaskService", () => { service = new CloudTaskService(mockAuthService as never); mockNetFetch.mockReset(); mockStreamFetch.mockReset(); + mockStreamTokenFetch.mockReset(); + // Default read-leg resolution: no proxy URL, so the stream reads from Django directly. + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve( + createJsonResponse({ token: "test-token", stream_base_url: null }), + ), + ); mockAuthService.authenticatedFetch.mockReset(); vi.stubGlobal("fetch", fetchRouter); @@ -657,6 +670,60 @@ describe("CloudTaskService", () => { await waitFor(() => !hasWatcher()); }); + it("reads via the agent-proxy with a Bearer token when the server resolves a base url", async () => { + vi.useFakeTimers(); + + mockStreamTokenFetch.mockImplementation(() => + Promise.resolve( + createJsonResponse({ + token: "proxy-token", + stream_base_url: "https://proxy.example", + }), + ), + ); + + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + return Promise.resolve( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ); + }); + + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => mockStreamFetch.mock.calls.length === 1); + + const [calledUrl, init] = mockStreamFetch.mock.calls[0]; + expect(String(calledUrl)).toMatch( + /^https:\/\/proxy\.example\/api\/projects\/2\/tasks\/task-1\/runs\/run-1\/stream\/(\?|$)/, + ); + expect((init?.headers as Record)?.Authorization).toBe( + "Bearer proxy-token", + ); + }); + it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => { vi.useFakeTimers(); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 5c8987bf5e..ff6645453b 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -115,6 +115,11 @@ interface WatcherState { needsPostBootstrapReconnect: boolean; needsStopAfterBootstrap: boolean; streamEnded: boolean; + // Read-leg routing, resolved once from the stream_token endpoint and reused across reconnects. + // streamBaseUrl set => read via the agent-proxy with streamReadToken; null => read from Django. + streamTargetResolved: boolean; + streamBaseUrl: string | null; + streamReadToken: string | null; } function watcherKey(taskId: string, runId: string): string { @@ -433,6 +438,9 @@ export class CloudTaskService extends TypedEventEmitter { needsPostBootstrapReconnect: false, needsStopAfterBootstrap: false, streamEnded: false, + streamTargetResolved: false, + streamBaseUrl: null, + streamReadToken: null, }; this.watchers.set(key, watcher); @@ -618,8 +626,28 @@ export class CloudTaskService extends TypedEventEmitter { const controller = new AbortController(); watcher.sseAbortController = controller; + // Resolve the read target once (proxy URL + token, or Django). The server owns the decision; + // reused across reconnects so transport churn never re-mints a token. + if (!watcher.streamTargetResolved) { + await this.resolveStreamTarget(watcher); + const resolvedWatcher = this.watchers.get(key); + if ( + !resolvedWatcher || + resolvedWatcher !== watcher || + controller.signal.aborted + ) { + return; + } + } + + const usingProxy = Boolean( + watcher.streamBaseUrl && watcher.streamReadToken, + ); + const base = usingProxy + ? watcher.streamBaseUrl?.replace(/\/+$/, "") + : watcher.apiHost; const url = new URL( - `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, + `${base}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream/`, ); if (options?.startLatest && !watcher.lastEventId) { url.searchParams.set("start", "latest"); @@ -630,6 +658,9 @@ export class CloudTaskService extends TypedEventEmitter { if (watcher.lastEventId) { headers["Last-Event-ID"] = watcher.lastEventId; } + if (usingProxy) { + headers.Authorization = `Bearer ${watcher.streamReadToken}`; + } const parser = new SseEventParser(); const decoder = new TextDecoder(); @@ -641,15 +672,19 @@ export class CloudTaskService extends TypedEventEmitter { let streamWasEstablished = false; try { - const response = await this.authService.authenticatedFetch( - fetch, - url.toString(), - { - method: "GET", - headers, - signal: controller.signal, - }, - ); + // The proxy authenticates with the run-scoped Bearer token, not the user session, so it + // takes a plain fetch. The Django leg still goes through authenticatedFetch. + const response = usingProxy + ? await fetch(url.toString(), { + method: "GET", + headers, + signal: controller.signal, + }) + : await this.authService.authenticatedFetch(fetch, url.toString(), { + method: "GET", + headers, + signal: controller.signal, + }); if (!response.ok) { throw createStreamStatusError(response.status); @@ -1340,6 +1375,39 @@ export class CloudTaskService extends TypedEventEmitter { } } + private async resolveStreamTarget(watcher: WatcherState): Promise { + const url = `${watcher.apiHost}/api/projects/${watcher.teamId}/tasks/${watcher.taskId}/runs/${watcher.runId}/stream_token/`; + try { + const response = await this.authService.authenticatedFetch(fetch, url, { + method: "GET", + }); + if (!response.ok) { + // Reachable but refused: read from Django and don't retry resolution. + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + watcher.streamTargetResolved = true; + return; + } + const data = (await response.json()) as { + token?: string; + stream_base_url?: string | null; + }; + watcher.streamReadToken = data.token ?? null; + watcher.streamBaseUrl = data.stream_base_url ?? null; + watcher.streamTargetResolved = true; + } catch (error) { + // Transient failure: leave unresolved so the next reconnect retries; connectSse falls back + // to the Django leg meanwhile since streamBaseUrl stays null. + watcher.streamBaseUrl = null; + watcher.streamReadToken = null; + log.warn("Cloud task stream target resolution failed", { + taskId: watcher.taskId, + runId: watcher.runId, + error, + }); + } + } + private async fetchTaskRun( watcher: WatcherState, ): Promise { From 48dbbf57db62c1b293b3bdc53576653e25c982e1 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 2 Jun 2026 20:06:18 -0700 Subject: [PATCH 4/4] feat(cloud-task): make reconnect loop status-unaware --- .../main/services/cloud-task/service.test.ts | 125 +++++------------- .../src/main/services/cloud-task/service.ts | 93 +++---------- 2 files changed, 49 insertions(+), 169 deletions(-) diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index cb112c3a7c..2dbcc2d423 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -841,9 +841,9 @@ describe("CloudTaskService", () => { ); expect(mockStreamFetch.mock.calls.length).toBe(6); - // 2 bootstrap calls + 1 post-bootstrap status verification + 6 - // handleStreamCompletion calls (one per stream error) - expect(mockNetFetch).toHaveBeenCalledTimes(9); + // Status is no longer polled per reconnect. Only the 2 bootstrap calls plus the single + // post-bootstrap verification touch the status endpoint; reconnects never do. + expect(mockNetFetch.mock.calls.length).toBeLessThanOrEqual(3); expect(updates).toContainEqual({ taskId: "task-1", runId: "run-1", @@ -1012,12 +1012,12 @@ describe("CloudTaskService", () => { }); }); - it("stops the watcher without reconnecting once the run is terminal", async () => { + it("reconnects on a clean EOF even after the run status goes terminal (status-unaware)", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - + // Bootstrap sees an active run (so it streams); every later status fetch reports terminal. + // Pre-decoupling, a clean EOF on a terminal run stopped the watch. Now run status is never + // consulted to decide reconnects, so the clean EOFs keep reconnecting until stream-end. let statusFetchCount = 0; mockNetFetch.mockImplementation((input: string | Request) => { const url = typeof input === "string" ? input : input.url; @@ -1027,7 +1027,6 @@ describe("CloudTaskService", () => { ); } statusFetchCount += 1; - // Bootstrap sees an active run; the post-stream status check sees terminal. return Promise.resolve( createJsonResponse({ id: "run-1", @@ -1056,22 +1055,14 @@ describe("CloudTaskService", () => { }); await waitFor(() => mockStreamFetch.mock.calls.length === 1); - await vi.advanceTimersByTimeAsync(10_000); + await waitFor(() => mockStreamFetch.mock.calls.length >= 3, 20_000); - expect(updates).toContainEqual( - expect.objectContaining({ - taskId: "task-1", - runId: "run-1", - kind: "status", - status: "completed", - }), - ); - expect(mockStreamFetch.mock.calls.length).toBe(1); + // Terminal status did not stop the watch; the watcher is still reconnecting. expect( (service as unknown as { watchers: Map }).watchers.has( "task-1:run-1", ), - ).toBe(false); + ).toBe(true); }); it("surfaces a retryable error when the backend errors even on a long-lived stream", async () => { @@ -1442,16 +1433,19 @@ describe("CloudTaskService", () => { expect(getWatcher()?.failed).toBe(false); }); - it("surfaces an error instead of retrying forever when run-state fetch keeps failing after a clean stream end", async () => { + it("does not poll run status per reconnect on clean EOFs (status-unaware)", async () => { vi.useFakeTimers(); - const updates: unknown[] = []; - service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); - - // Bootstrap succeeds (run + empty backlog); every subsequent run-state - // fetch returns 500 (a non-fatal status -> fetchTaskRun resolves null). - mockNetFetch - .mockResolvedValueOnce( + let statusFetchCount = 0; + mockNetFetch.mockImplementation((input: string | Request) => { + const url = typeof input === "string" ? input : input.url; + if (url.includes("/session_logs/")) { + return Promise.resolve( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + } + statusFetchCount += 1; + return Promise.resolve( createJsonResponse({ id: "run-1", status: "in_progress", @@ -1461,40 +1455,14 @@ describe("CloudTaskService", () => { branch: "main", updated_at: "2026-01-01T00:00:00Z", }), - ) // bootstrap: fetchTaskRun - .mockResolvedValueOnce( - createJsonResponse([], 200, { "X-Has-More": "false" }), - ) // bootstrap: fetchSessionLogs - .mockImplementation(() => - Promise.resolve(createJsonResponse({ detail: "boom" }, 500)), - ); - - // First connection is held open so bootstrap can finish; the test then - // closes it cleanly. Every later connection ends cleanly on its own, so the - // only thing that can fail is the post-stream run-state fetch (500). - let streamCall = 0; - const firstControllerRef: { - current: ReadableStreamDefaultController | null; - } = { current: null }; - mockStreamFetch.mockImplementation(() => { - streamCall += 1; - const stream = new ReadableStream({ - start(controller) { - if (streamCall === 1) { - firstControllerRef.current = controller; - } else { - controller.close(); - } - }, - }); - return Promise.resolve( - new Response(stream, { - status: 200, - headers: { "Content-Type": "text/event-stream" }, - }), ); }); + // Every connection ends cleanly with no stream-end sentinel, forcing reconnect after reconnect. + mockStreamFetch.mockImplementation(() => + Promise.resolve(createSseResponse("")), + ); + service.watch({ taskId: "task-1", runId: "run-1", @@ -1502,44 +1470,11 @@ describe("CloudTaskService", () => { teamId: 2, }); - // Wait for bootstrap to emit its snapshot and hold the live connection open. - await waitFor( - () => - !!firstControllerRef.current && - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "snapshot", - ), - ); - - // Close the live stream cleanly: each clean end now fetches run state, which - // 500s. The reconnect must charge the budget so it eventually gives up. - firstControllerRef.current?.close(); - - // Budget is 5 attempts (2s + 4s + 8s + 16s + 30s + 30s of backoff). - await vi.advanceTimersByTimeAsync(120_000); - await waitFor( - () => - updates.some( - (u) => - typeof u === "object" && - u !== null && - (u as { kind?: string }).kind === "error", - ), - 20_000, - ); + await waitFor(() => mockStreamFetch.mock.calls.length >= 5, 20_000); - expect(updates).toContainEqual({ - taskId: "task-1", - runId: "run-1", - kind: "error", - errorTitle: "Cloud run state unavailable", - errorMessage: - "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", - retryable: true, - }); + // Bootstrap fetches status once and the post-bootstrap verification once more; reconnects add + // none. Pre-decoupling, every clean EOF polled status, so this count would climb with reconnects. + expect(statusFetchCount).toBeLessThanOrEqual(2); }); const guardedFetchStatusExpectations = [ diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index ff6645453b..f85c6bd9a5 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -570,10 +570,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - if ( - watcher.needsStopAfterBootstrap || - isTerminalStatus(watcher.lastStatus) - ) { + if (watcher.needsStopAfterBootstrap) { watcher.needsStopAfterBootstrap = false; this.stopWatcher(key); return; @@ -727,7 +724,7 @@ export class CloudTaskService extends TypedEventEmitter { return; } - await this.handleStreamCompletion(key, { reconnectIfNonTerminal: true }); + await this.handleStreamCompletion(key, { reconnectOnDisconnect: true }); } catch (error) { this.flushLogBatch(key); @@ -768,7 +765,7 @@ export class CloudTaskService extends TypedEventEmitter { isBackendError, }); await this.handleStreamCompletion(key, { - reconnectIfNonTerminal: true, + reconnectOnDisconnect: true, reconnectError: error, countReconnectAttempt: !isBackendError && !wasHealthyStream, }); @@ -1084,7 +1081,9 @@ export class CloudTaskService extends TypedEventEmitter { options: { countAttempt?: boolean } = {}, ): void { const watcher = this.watchers.get(key); - if (!watcher || watcher.failed || isTerminalStatus(watcher.lastStatus)) { + // No isTerminalStatus gate: the reconnect loop is status-unaware and only stops on the + // stream-end sentinel (handled in handleStreamCompletion) or the budget exhaustion below. + if (!watcher || watcher.failed) { return; } @@ -1155,7 +1154,7 @@ export class CloudTaskService extends TypedEventEmitter { private async handleStreamCompletion( key: string, options: { - reconnectIfNonTerminal: boolean; + reconnectOnDisconnect: boolean; reconnectError?: unknown; countReconnectAttempt?: boolean; }, @@ -1164,92 +1163,38 @@ export class CloudTaskService extends TypedEventEmitter { if (!watcher) return; if (watcher.failed) return; - // Durable end-of-stream: the server signalled this run's stream is complete, so - // stop without polling run status. This is the status-unaware durable-stream contract - // — absent this sentinel we keep reconnecting (below) to ride out transport churn. + // The durable end-of-stream sentinel is the ONLY signal that ends the watch. The client is + // unaware of sandbox/run status and assumes the transport breaks mid-message constantly, so any + // disconnect without stream-end is just transport churn to ride out by reconnecting. We never + // poll run status to decide whether to stop — that would drop the tail if the stream cut right as + // the run went terminal but before stream-end arrived. Status is updated for display only, from + // the task_run_state events the stream itself carries. if (watcher.streamEnded) { this.emitStatusUpdate(watcher); this.stopWatcher(key); return; } - const { reconnectIfNonTerminal } = options; - const run = await this.fetchTaskRun(watcher); - const currentWatcher = this.watchers.get(key); - if (!currentWatcher || currentWatcher !== watcher) return; - if (watcher.failed) return; + const { reconnectOnDisconnect } = options; if (watcher.isBootstrapping) { - if (!run) { + // Bootstrap still owns the snapshot lifecycle; record reconnect intent and let it finish. + if (reconnectOnDisconnect) { watcher.needsPostBootstrapReconnect = true; - return; - } - - this.applyTaskRunState(watcher, run); - if (isTerminalStatus(watcher.lastStatus) || !reconnectIfNonTerminal) { - watcher.needsStopAfterBootstrap = true; } else { - watcher.needsPostBootstrapReconnect = true; + watcher.needsStopAfterBootstrap = true; } return; } - if (!run) { - this.scheduleReconnect( - key, - new CloudTaskStreamError("Failed to fetch terminal cloud run state", { - title: "Cloud run state unavailable", - message: - "Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.", - retryable: true, - }), - { countAttempt: options.countReconnectAttempt ?? true }, - ); - return; - } - - const stateChanged = this.applyTaskRunState(watcher, run); - - if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) { - if (stateChanged) { - // Polled progress proves the run is alive — reset both budgets. - watcher.reconnectAttempts = 0; - watcher.cumulativeReconnectAttempts = 0; - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: watcher.lastStatus ?? undefined, - stage: watcher.lastStage, - output: watcher.lastOutput, - errorMessage: watcher.lastErrorMessage, - branch: watcher.lastBranch, - }); - } - log.warn("Cloud task stream ended before terminal status", { - key, - status: watcher.lastStatus, - }); + if (reconnectOnDisconnect) { this.scheduleReconnect(key, options.reconnectError, { countAttempt: options.countReconnectAttempt ?? false, }); return; } - // Always emit the latest status before stopping. Terminal states are - // intentionally deferred until stream completion; clean EOFs can also mean - // the backend has no more stream events even when the run status remains active. - this.emit(CloudTaskEvent.Update, { - taskId: watcher.taskId, - runId: watcher.runId, - kind: "status", - status: watcher.lastStatus ?? undefined, - stage: watcher.lastStage, - output: watcher.lastOutput, - errorMessage: watcher.lastErrorMessage, - branch: watcher.lastBranch, - }); - + this.emitStatusUpdate(watcher); this.stopWatcher(key); }