From 08bbef8a8b66de0708f7d6f4473242c68756e4f2 Mon Sep 17 00:00:00 2001 From: brvale97 Date: Tue, 5 May 2026 05:36:22 +0200 Subject: [PATCH] fix(server): keep active turns running until session settles --- .../Layers/ProjectionPipeline.test.ts | 276 ++++++++++++++++++ .../Layers/ProjectionPipeline.ts | 52 +++- 2 files changed, 314 insertions(+), 14 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 7f364c717a7..d692e5cf08e 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -2165,6 +2165,282 @@ it.effect("restores pending turn-start metadata across projection pipeline resta ), ); +it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-active-turn-segment-")))( + "OrchestrationProjectionPipeline", + (it) => { + it.effect("keeps the active running turn open across non-streaming assistant segments", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + const projectId = ProjectId.make("project-active-turn-segment"); + const threadId = ThreadId.make("thread-active-turn-segment"); + const turnId = TurnId.make("turn-active-segment"); + const userMessageId = MessageId.make("user-active-segment"); + const interimMessageId = MessageId.make("assistant-active-segment-interim"); + const finalMessageId = MessageId.make("assistant-active-segment-final"); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.make("evt-active-segment-1"), + aggregateKind: "project", + aggregateId: projectId, + occurredAt: "2026-05-05T03:20:00.000Z", + commandId: CommandId.make("cmd-active-segment-1"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-1"), + metadata: {}, + payload: { + projectId, + title: "Active Turn Segment Project", + workspaceRoot: "/tmp/active-turn-segment", + defaultModelSelection: null, + scripts: [], + createdAt: "2026-05-05T03:20:00.000Z", + updatedAt: "2026-05-05T03:20:00.000Z", + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.make("evt-active-segment-2"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: "2026-05-05T03:20:01.000Z", + commandId: CommandId.make("cmd-active-segment-2"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-2"), + metadata: {}, + payload: { + threadId, + projectId, + title: "Active Turn Segment Thread", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: "2026-05-05T03:20:01.000Z", + updatedAt: "2026-05-05T03:20:01.000Z", + }, + }); + + yield* appendAndProject({ + type: "thread.turn-start-requested", + eventId: EventId.make("evt-active-segment-3"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: "2026-05-05T03:20:04.714Z", + commandId: CommandId.make("cmd-active-segment-3"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-3"), + metadata: {}, + payload: { + threadId, + messageId: userMessageId, + runtimeMode: "full-access", + createdAt: "2026-05-05T03:20:04.714Z", + }, + }); + + yield* appendAndProject({ + type: "thread.session-set", + eventId: EventId.make("evt-active-segment-4"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: "2026-05-05T03:20:20.549Z", + commandId: CommandId.make("cmd-active-segment-4"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-4"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "running", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: turnId, + lastError: null, + updatedAt: "2026-05-05T03:20:20.549Z", + }, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-active-segment-5"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: "2026-05-05T03:20:31.834Z", + commandId: CommandId.make("cmd-active-segment-5"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-5"), + metadata: {}, + payload: { + threadId, + messageId: interimMessageId, + role: "assistant", + text: "", + turnId, + streaming: false, + createdAt: "2026-05-05T03:20:31.834Z", + updatedAt: "2026-05-05T03:20:31.834Z", + }, + }); + + const runningRows = yield* sql<{ + readonly state: string; + readonly completedAt: string | null; + readonly assistantMessageId: string | null; + readonly sessionStatus: string; + readonly activeTurnId: string | null; + }>` + SELECT + turns.state, + turns.completed_at AS "completedAt", + turns.assistant_message_id AS "assistantMessageId", + sessions.status AS "sessionStatus", + sessions.active_turn_id AS "activeTurnId" + FROM projection_turns AS turns + JOIN projection_thread_sessions AS sessions + ON sessions.thread_id = turns.thread_id + WHERE turns.thread_id = ${threadId} + AND turns.turn_id = ${turnId} + `; + + assert.deepEqual(runningRows, [ + { + state: "running", + completedAt: null, + assistantMessageId: "assistant-active-segment-interim", + sessionStatus: "running", + activeTurnId: "turn-active-segment", + }, + ]); + + yield* appendAndProject({ + type: "thread.session-set", + eventId: EventId.make("evt-active-segment-6"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: "2026-05-05T03:20:45.000Z", + commandId: CommandId.make("cmd-active-segment-6"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-6"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "ready", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: "2026-05-05T03:20:45.000Z", + }, + }, + }); + + yield* appendAndProject({ + type: "thread.message-sent", + eventId: EventId.make("evt-active-segment-7"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: "2026-05-05T03:20:46.000Z", + commandId: CommandId.make("cmd-active-segment-7"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-active-segment-7"), + metadata: {}, + payload: { + threadId, + messageId: finalMessageId, + role: "assistant", + text: "", + turnId, + streaming: false, + createdAt: "2026-05-05T03:20:46.000Z", + updatedAt: "2026-05-05T03:20:46.000Z", + }, + }); + + const completedRows = yield* sql<{ + readonly state: string; + readonly completedAt: string | null; + readonly assistantMessageId: string | null; + readonly sessionStatus: string; + readonly activeTurnId: string | null; + }>` + SELECT + turns.state, + turns.completed_at AS "completedAt", + turns.assistant_message_id AS "assistantMessageId", + sessions.status AS "sessionStatus", + sessions.active_turn_id AS "activeTurnId" + FROM projection_turns AS turns + JOIN projection_thread_sessions AS sessions + ON sessions.thread_id = turns.thread_id + WHERE turns.thread_id = ${threadId} + AND turns.turn_id = ${turnId} + `; + + assert.deepEqual(completedRows, [ + { + state: "completed", + completedAt: "2026-05-05T03:20:45.000Z", + assistantMessageId: "assistant-active-segment-final", + sessionStatus: "ready", + activeTurnId: null, + }, + ]); + + yield* sql`DELETE FROM projection_turns`; + yield* sql`DELETE FROM projection_thread_sessions`; + yield* sql`DELETE FROM projection_state`; + yield* projectionPipeline.bootstrap; + + const rebuiltRows = yield* sql<{ + readonly state: string; + readonly completedAt: string | null; + readonly assistantMessageId: string | null; + readonly sessionStatus: string; + readonly activeTurnId: string | null; + }>` + SELECT + turns.state, + turns.completed_at AS "completedAt", + turns.assistant_message_id AS "assistantMessageId", + sessions.status AS "sessionStatus", + sessions.active_turn_id AS "activeTurnId" + FROM projection_turns AS turns + JOIN projection_thread_sessions AS sessions + ON sessions.thread_id = turns.thread_id + WHERE turns.thread_id = ${threadId} + AND turns.turn_id = ${turnId} + `; + + assert.deepEqual(rebuiltRows, [ + { + state: "completed", + completedAt: "2026-05-05T03:20:45.000Z", + assistantMessageId: "assistant-active-segment-final", + sessionStatus: "ready", + activeTurnId: null, + }, + ]); + }), + ); + }, +); + const engineLayer = it.layer( OrchestrationEngineLive.pipe( Layer.provide(OrchestrationProjectionSnapshotQueryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 3ef8b38d642..5669f5f2f20 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -993,6 +993,25 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti case "thread.session-set": { const turnId = event.payload.session.activeTurnId; if (turnId === null || event.payload.session.status !== "running") { + if (event.payload.session.status === "running") { + return; + } + const existingTurns = yield* projectionTurnRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const runningTurn = existingTurns + .toReversed() + .find((entry) => entry.turnId !== null && entry.state === "running"); + if (!runningTurn || runningTurn.turnId === null) { + return; + } + + yield* projectionTurnRepository.upsertByTurnId({ + ...runningTurn, + turnId: runningTurn.turnId, + state: event.payload.session.status === "error" ? "error" : "completed", + completedAt: runningTurn.completedAt ?? event.payload.session.updatedAt, + }); return; } @@ -1004,13 +1023,11 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti threadId: event.payload.threadId, }); if (Option.isSome(existingTurn)) { - const nextState = - existingTurn.value.state === "completed" || existingTurn.value.state === "error" - ? existingTurn.value.state - : "running"; + const nextState = existingTurn.value.state === "error" ? "error" : "running"; yield* projectionTurnRepository.upsertByTurnId({ ...existingTurn.value, state: nextState, + completedAt: nextState === "running" ? null : existingTurn.value.completedAt, pendingMessageId: existingTurn.value.pendingMessageId ?? (Option.isSome(pendingTurnStart) ? pendingTurnStart.value.messageId : null), @@ -1079,19 +1096,26 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti turnId: event.payload.turnId, }); if (Option.isSome(existingTurn)) { + const preserveSettledFailure = + existingTurn.value.state === "interrupted" || existingTurn.value.state === "error"; + const nextState = event.payload.streaming + ? existingTurn.value.state + : existingTurn.value.state === "running" + ? "running" + : preserveSettledFailure + ? existingTurn.value.state + : "completed"; + const completedAt = + event.payload.streaming || existingTurn.value.state === "running" + ? existingTurn.value.completedAt + : preserveSettledFailure + ? existingTurn.value.completedAt + : (existingTurn.value.completedAt ?? event.payload.updatedAt); yield* projectionTurnRepository.upsertByTurnId({ ...existingTurn.value, assistantMessageId: event.payload.messageId, - state: event.payload.streaming - ? existingTurn.value.state - : existingTurn.value.state === "interrupted" - ? "interrupted" - : existingTurn.value.state === "error" - ? "error" - : "completed", - completedAt: event.payload.streaming - ? existingTurn.value.completedAt - : (existingTurn.value.completedAt ?? event.payload.updatedAt), + state: nextState, + completedAt, startedAt: existingTurn.value.startedAt ?? event.payload.createdAt, requestedAt: existingTurn.value.requestedAt ?? event.payload.createdAt, });