Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 153 additions & 98 deletions apps/code/src/main/services/cloud-task/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ import { CloudTaskEvent } from "./schemas";

const mockNetFetch = vi.hoisted(() => vi.fn());
const mockStreamFetch = vi.hoisted(() => vi.fn());
const mockStreamTokenFetch = vi.hoisted(() => vi.fn());

// The service now uses global fetch for BOTH authenticated API calls (JSON)
// and SSE streaming. The two used to be distinct (net.fetch vs global fetch).
// To preserve the existing test fixtures, route by URL: /stream/ → stream mock,
// everything else → API mock.
// Route by URL: /stream_token/ → token mock (read-leg resolution), /stream/ → stream mock,
// everything else → API mock. The token mock has a Django-path default so existing fixtures
// (which never set it) are untouched.
const fetchRouter = vi.hoisted(() =>
vi.fn((input: string | Request, init?: RequestInit) => {
const url = typeof input === "string" ? input : input.url;
const impl = url.includes("/stream/") ? mockStreamFetch : mockNetFetch;
const impl = url.includes("/stream_token/")
? mockStreamTokenFetch
: url.includes("/stream/")
? mockStreamFetch
: mockNetFetch;
return impl(input, init);
}),
);
Expand Down Expand Up @@ -97,6 +103,13 @@ describe("CloudTaskService", () => {
service = new CloudTaskService(mockAuthService as never);
mockNetFetch.mockReset();
mockStreamFetch.mockReset();
mockStreamTokenFetch.mockReset();
// Default read-leg resolution: no proxy URL, so the stream reads from Django directly.
mockStreamTokenFetch.mockImplementation(() =>
Promise.resolve(
createJsonResponse({ token: "test-token", stream_base_url: null }),
),
);
mockAuthService.authenticatedFetch.mockReset();
vi.stubGlobal("fetch", fetchRouter);

Expand Down Expand Up @@ -604,6 +617,113 @@ describe("CloudTaskService", () => {
).toBe(true);
});

it("stops without reconnecting when the server emits stream-end on a non-terminal run", async () => {
vi.useFakeTimers();

// Run status stays non-terminal the whole time. Pre-durable-contract, a clean EOF on a
// non-terminal run reconnects (see the test above); the stream-end sentinel must override that.
mockNetFetch.mockImplementation((input: string | Request) => {
const url = typeof input === "string" ? input : input.url;
if (url.includes("/session_logs/")) {
return Promise.resolve(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);
}
return Promise.resolve(
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: "build",
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
}),
);
});

mockStreamFetch.mockImplementation(() =>
Promise.resolve(
createSseResponse(
'id: 1\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:01Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"hi"}}}\n\nevent: stream-end\ndata: {}\n\n',
),
),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

const hasWatcher = (): boolean =>
(service as unknown as { watchers: Map<string, unknown> }).watchers.has(
"task-1:run-1",
);

await waitFor(() => mockStreamFetch.mock.calls.length === 1);
// Let the reconnect delay (2s base) elapse; with stream-end honored, none is scheduled.
await vi.advanceTimersByTimeAsync(10_000);

expect(mockStreamFetch.mock.calls.length).toBe(1);
await waitFor(() => !hasWatcher());
});

it("reads via the agent-proxy with a Bearer token when the server resolves a base url", async () => {
vi.useFakeTimers();

mockStreamTokenFetch.mockImplementation(() =>
Promise.resolve(
createJsonResponse({
token: "proxy-token",
stream_base_url: "https://proxy.example",
}),
),
);

mockNetFetch.mockImplementation((input: string | Request) => {
const url = typeof input === "string" ? input : input.url;
if (url.includes("/session_logs/")) {
return Promise.resolve(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);
}
return Promise.resolve(
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: "build",
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
}),
);
});

mockStreamFetch.mockImplementation(() =>
Promise.resolve(createSseResponse("event: stream-end\ndata: {}\n\n")),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() => mockStreamFetch.mock.calls.length === 1);

const [calledUrl, init] = mockStreamFetch.mock.calls[0];
expect(String(calledUrl)).toMatch(
/^https:\/\/proxy\.example\/api\/projects\/2\/tasks\/task-1\/runs\/run-1\/stream\/(\?|$)/,
);
expect((init?.headers as Record<string, string>)?.Authorization).toBe(
"Bearer proxy-token",
);
});

it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => {
vi.useFakeTimers();

Expand Down Expand Up @@ -721,9 +841,9 @@ describe("CloudTaskService", () => {
);

expect(mockStreamFetch.mock.calls.length).toBe(6);
// 2 bootstrap calls + 1 post-bootstrap status verification + 6
// handleStreamCompletion calls (one per stream error)
expect(mockNetFetch).toHaveBeenCalledTimes(9);
// Status is no longer polled per reconnect. Only the 2 bootstrap calls plus the single
// post-bootstrap verification touch the status endpoint; reconnects never do.
expect(mockNetFetch.mock.calls.length).toBeLessThanOrEqual(3);
expect(updates).toContainEqual({
taskId: "task-1",
runId: "run-1",
Expand Down Expand Up @@ -892,12 +1012,12 @@ describe("CloudTaskService", () => {
});
});

it("stops the watcher without reconnecting once the run is terminal", async () => {
it("reconnects on a clean EOF even after the run status goes terminal (status-unaware)", async () => {
vi.useFakeTimers();

const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

// Bootstrap sees an active run (so it streams); every later status fetch reports terminal.
// Pre-decoupling, a clean EOF on a terminal run stopped the watch. Now run status is never
// consulted to decide reconnects, so the clean EOFs keep reconnecting until stream-end.
let statusFetchCount = 0;
mockNetFetch.mockImplementation((input: string | Request) => {
const url = typeof input === "string" ? input : input.url;
Expand All @@ -907,7 +1027,6 @@ describe("CloudTaskService", () => {
);
}
statusFetchCount += 1;
// Bootstrap sees an active run; the post-stream status check sees terminal.
return Promise.resolve(
createJsonResponse({
id: "run-1",
Expand Down Expand Up @@ -936,22 +1055,14 @@ describe("CloudTaskService", () => {
});

await waitFor(() => mockStreamFetch.mock.calls.length === 1);
await vi.advanceTimersByTimeAsync(10_000);
await waitFor(() => mockStreamFetch.mock.calls.length >= 3, 20_000);

expect(updates).toContainEqual(
expect.objectContaining({
taskId: "task-1",
runId: "run-1",
kind: "status",
status: "completed",
}),
);
expect(mockStreamFetch.mock.calls.length).toBe(1);
// Terminal status did not stop the watch; the watcher is still reconnecting.
expect(
(service as unknown as { watchers: Map<string, unknown> }).watchers.has(
"task-1:run-1",
),
).toBe(false);
).toBe(true);
});

it("surfaces a retryable error when the backend errors even on a long-lived stream", async () => {
Expand Down Expand Up @@ -1322,16 +1433,19 @@ describe("CloudTaskService", () => {
expect(getWatcher()?.failed).toBe(false);
});

it("surfaces an error instead of retrying forever when run-state fetch keeps failing after a clean stream end", async () => {
it("does not poll run status per reconnect on clean EOFs (status-unaware)", async () => {
vi.useFakeTimers();

const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

// Bootstrap succeeds (run + empty backlog); every subsequent run-state
// fetch returns 500 (a non-fatal status -> fetchTaskRun resolves null).
mockNetFetch
.mockResolvedValueOnce(
let statusFetchCount = 0;
mockNetFetch.mockImplementation((input: string | Request) => {
const url = typeof input === "string" ? input : input.url;
if (url.includes("/session_logs/")) {
return Promise.resolve(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);
}
statusFetchCount += 1;
return Promise.resolve(
createJsonResponse({
id: "run-1",
status: "in_progress",
Expand All @@ -1341,85 +1455,26 @@ describe("CloudTaskService", () => {
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
}),
) // bootstrap: fetchTaskRun
.mockResolvedValueOnce(
createJsonResponse([], 200, { "X-Has-More": "false" }),
) // bootstrap: fetchSessionLogs
.mockImplementation(() =>
Promise.resolve(createJsonResponse({ detail: "boom" }, 500)),
);

// First connection is held open so bootstrap can finish; the test then
// closes it cleanly. Every later connection ends cleanly on its own, so the
// only thing that can fail is the post-stream run-state fetch (500).
let streamCall = 0;
const firstControllerRef: {
current: ReadableStreamDefaultController<Uint8Array> | null;
} = { current: null };
mockStreamFetch.mockImplementation(() => {
streamCall += 1;
const stream = new ReadableStream<Uint8Array>({
start(controller) {
if (streamCall === 1) {
firstControllerRef.current = controller;
} else {
controller.close();
}
},
});
return Promise.resolve(
new Response(stream, {
status: 200,
headers: { "Content-Type": "text/event-stream" },
}),
);
});

// Every connection ends cleanly with no stream-end sentinel, forcing reconnect after reconnect.
mockStreamFetch.mockImplementation(() =>
Promise.resolve(createSseResponse("")),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

// Wait for bootstrap to emit its snapshot and hold the live connection open.
await waitFor(
() =>
!!firstControllerRef.current &&
updates.some(
(u) =>
typeof u === "object" &&
u !== null &&
(u as { kind?: string }).kind === "snapshot",
),
);

// Close the live stream cleanly: each clean end now fetches run state, which
// 500s. The reconnect must charge the budget so it eventually gives up.
firstControllerRef.current?.close();
await waitFor(() => mockStreamFetch.mock.calls.length >= 5, 20_000);

// Budget is 5 attempts (2s + 4s + 8s + 16s + 30s + 30s of backoff).
await vi.advanceTimersByTimeAsync(120_000);
await waitFor(
() =>
updates.some(
(u) =>
typeof u === "object" &&
u !== null &&
(u as { kind?: string }).kind === "error",
),
20_000,
);

expect(updates).toContainEqual({
taskId: "task-1",
runId: "run-1",
kind: "error",
errorTitle: "Cloud run state unavailable",
errorMessage:
"Could not fetch the latest cloud run state after the stream ended. Retry to reconnect.",
retryable: true,
});
// Bootstrap fetches status once and the post-bootstrap verification once more; reconnects add
// none. Pre-decoupling, every clean EOF polled status, so this count would climb with reconnects.
expect(statusFetchCount).toBeLessThanOrEqual(2);
});

const guardedFetchStatusExpectations = [
Expand Down
Loading
Loading