diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 837c32fc4fd..a247dcd3233 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -59,6 +59,7 @@ import { type OrchestrationEngineShape, } from "../src/orchestration/Services/OrchestrationEngine.ts"; import { ThreadDeletionReactor } from "../src/orchestration/Services/ThreadDeletionReactor.ts"; +import { QueuedTurnDrainReactor } from "../src/orchestration/Services/QueuedTurnDrainReactor.ts"; import { OrchestrationReactor } from "../src/orchestration/Services/OrchestrationReactor.ts"; import { ProjectionSnapshotQuery } from "../src/orchestration/Services/ProjectionSnapshotQuery.ts"; import { @@ -358,6 +359,12 @@ export const makeOrchestrationIntegrationHarness = ( Layer.provideMerge(runtimeIngestionLayer), Layer.provideMerge(providerCommandReactorLayer), Layer.provideMerge(checkpointReactorLayer), + Layer.provideMerge( + Layer.succeed(QueuedTurnDrainReactor, { + start: () => Effect.void, + drain: Effect.void, + }), + ), Layer.provideMerge( Layer.succeed(ThreadDeletionReactor, { start: () => Effect.void, diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 7909d5cd6b1..8b5268e5579 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -148,6 +148,7 @@ describe("OrchestrationEngine", () => { archivedAt: null, deletedAt: null, messages: [], + queuedTurns: [], proposedPlans: [], activities: [], checkpoints: [], @@ -160,6 +161,7 @@ describe("OrchestrationEngine", () => { threads: projectionSnapshot.threads.map((thread) => ({ ...thread, messages: [], + queuedTurns: [], proposedPlans: [], activities: [], checkpoints: [], diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts index 6155af8858a..36ecf4970b3 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts @@ -9,6 +9,7 @@ import { CheckpointReactor } from "../Services/CheckpointReactor.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; import { ThreadDeletionReactor } from "../Services/ThreadDeletionReactor.ts"; +import { QueuedTurnDrainReactor } from "../Services/QueuedTurnDrainReactor.ts"; import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts"; import { makeOrchestrationReactor } from "./OrchestrationReactor.ts"; @@ -22,7 +23,7 @@ describe("OrchestrationReactor", () => { runtime = null; }); - it("starts provider ingestion, provider command, checkpoint, and thread deletion reactors", async () => { + it("starts provider ingestion, provider command, queue drain, checkpoint, and thread deletion reactors", async () => { const started: string[] = []; runtime = ManagedRuntime.make( @@ -45,6 +46,15 @@ describe("OrchestrationReactor", () => { drain: Effect.void, }), ), + Layer.provideMerge( + Layer.succeed(QueuedTurnDrainReactor, { + start: () => { + started.push("queued-turn-drain-reactor"); + return Effect.void; + }, + drain: Effect.void, + }), + ), Layer.provideMerge( Layer.succeed(CheckpointReactor, { start: () => { @@ -73,6 +83,7 @@ describe("OrchestrationReactor", () => { expect(started).toEqual([ "provider-runtime-ingestion", "provider-command-reactor", + "queued-turn-drain-reactor", "checkpoint-reactor", "thread-deletion-reactor", ]); diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts index 5e432d9884f..71203e152d7 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts @@ -8,17 +8,20 @@ import { import { CheckpointReactor } from "../Services/CheckpointReactor.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; +import { QueuedTurnDrainReactor } from "../Services/QueuedTurnDrainReactor.ts"; import { ThreadDeletionReactor } from "../Services/ThreadDeletionReactor.ts"; export const makeOrchestrationReactor = Effect.gen(function* () { const providerRuntimeIngestion = yield* ProviderRuntimeIngestionService; const providerCommandReactor = yield* ProviderCommandReactor; const checkpointReactor = yield* CheckpointReactor; + const queuedTurnDrainReactor = yield* QueuedTurnDrainReactor; const threadDeletionReactor = yield* ThreadDeletionReactor; const start: OrchestrationReactorShape["start"] = Effect.fn("start")(function* () { yield* providerRuntimeIngestion.start(); yield* providerCommandReactor.start(); + yield* queuedTurnDrainReactor.start(); yield* checkpointReactor.start(); yield* threadDeletionReactor.start(); }); diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 1161ff6a7d7..f7bf4084ed6 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -4,6 +4,10 @@ import { type OrchestrationEvent, ThreadId, } from "@t3tools/contracts"; +import { + getQueuedTurnLifecycleOperation, + type QueuedTurnLifecycleOperation, +} from "@t3tools/shared/orchestrationQueue"; import * as Effect from "effect/Effect"; import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; @@ -16,6 +20,7 @@ import { toPersistenceSqlError, type ProjectionRepositoryError } from "../../per import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts"; import { ProjectionPendingApprovalRepository } from "../../persistence/Services/ProjectionPendingApprovals.ts"; import { ProjectionProjectRepository } from "../../persistence/Services/ProjectionProjects.ts"; +import { ProjectionQueuedTurnRepository } from "../../persistence/Services/ProjectionQueuedTurns.ts"; import { ProjectionStateRepository } from "../../persistence/Services/ProjectionState.ts"; import { ProjectionThreadActivityRepository } from "../../persistence/Services/ProjectionThreadActivities.ts"; import { type ProjectionThreadActivity } from "../../persistence/Services/ProjectionThreadActivities.ts"; @@ -35,6 +40,7 @@ import { import { ProjectionThreadRepository } from "../../persistence/Services/ProjectionThreads.ts"; import { ProjectionPendingApprovalRepositoryLive } from "../../persistence/Layers/ProjectionPendingApprovals.ts"; import { ProjectionProjectRepositoryLive } from "../../persistence/Layers/ProjectionProjects.ts"; +import { ProjectionQueuedTurnRepositoryLive } from "../../persistence/Layers/ProjectionQueuedTurns.ts"; import { ProjectionStateRepositoryLive } from "../../persistence/Layers/ProjectionState.ts"; import { ProjectionThreadActivityRepositoryLive } from "../../persistence/Layers/ProjectionThreadActivities.ts"; import { ProjectionThreadMessageRepositoryLive } from "../../persistence/Layers/ProjectionThreadMessages.ts"; @@ -62,6 +68,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = { threadActivities: "projection.thread-activities", threadSessions: "projection.thread-sessions", threadTurns: "projection.thread-turns", + queuedTurns: "projection.queued-turns", checkpoints: "projection.checkpoints", pendingApprovals: "projection.pending-approvals", } as const; @@ -456,6 +463,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; const projectionTurnRepository = yield* ProjectionTurnRepository; const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; + const projectionQueuedTurnRepository = yield* ProjectionQueuedTurnRepository; const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; @@ -980,6 +988,67 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti }); }); + const updateQueuedTurnStatus = Effect.fn("updateQueuedTurnStatus")(function* ( + operation: Extract, + ) { + const existing = yield* projectionQueuedTurnRepository.getByQueueItemId({ + queueItemId: operation.queueItemId, + }); + if (Option.isNone(existing)) { + return; + } + yield* projectionQueuedTurnRepository.upsert({ + ...existing.value, + ...operation.patch, + }); + }); + + const applyQueuedTurnsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyQueuedTurnsProjection", + )(function* (event, _attachmentSideEffects) { + switch (event.type) { + case "thread.turn-queued": + case "thread.queued-turn-send-started": + case "thread.queued-turn-resolved": + case "thread.queued-turn-requeued": + case "thread.queued-turn-send-failed": { + const operation = getQueuedTurnLifecycleOperation(event); + switch (operation.kind) { + case "upsert": + yield* projectionQueuedTurnRepository.upsert({ + queueItemId: operation.queuedTurn.queueItemId, + threadId: operation.threadId, + request: operation.queuedTurn.request, + status: operation.queuedTurn.status, + failureReason: operation.queuedTurn.failureReason, + createdAt: operation.queuedTurn.createdAt, + updatedAt: operation.queuedTurn.updatedAt, + }); + return; + + case "update": + yield* updateQueuedTurnStatus(operation); + return; + + case "delete": + yield* projectionQueuedTurnRepository.deleteByQueueItemId({ + queueItemId: operation.queueItemId, + }); + return; + } + } + + case "thread.deleted": + yield* projectionQueuedTurnRepository.deleteByThreadId({ + threadId: event.payload.threadId, + }); + return; + + default: + return; + } + }); + const applyThreadTurnsProjection: ProjectorDefinition["apply"] = Effect.fn( "applyThreadTurnsProjection", )(function* (event, _attachmentSideEffects) { @@ -988,6 +1057,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti yield* projectionTurnRepository.replacePendingTurnStart({ threadId: event.payload.threadId, messageId: event.payload.messageId, + queueItemId: event.payload.queueItemId ?? null, sourceProposedPlanThreadId: event.payload.sourceProposedPlan?.threadId ?? null, sourceProposedPlanId: event.payload.sourceProposedPlan?.planId ?? null, requestedAt: event.payload.createdAt, @@ -1387,6 +1457,10 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti name: ORCHESTRATION_PROJECTOR_NAMES.threadTurns, apply: applyThreadTurnsProjection, }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.queuedTurns, + apply: applyQueuedTurnsProjection, + }, { name: ORCHESTRATION_PROJECTOR_NAMES.checkpoints, apply: applyCheckpointsProjection, @@ -1500,6 +1574,7 @@ export const OrchestrationProjectionPipelineLive = Layer.effect( Layer.provideMerge(ProjectionThreadActivityRepositoryLive), Layer.provideMerge(ProjectionThreadSessionRepositoryLive), Layer.provideMerge(ProjectionTurnRepositoryLive), + Layer.provideMerge(ProjectionQueuedTurnRepositoryLive), Layer.provideMerge(ProjectionPendingApprovalRepositoryLive), Layer.provideMerge(ProjectionStateRepositoryLive), ); diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index 7db2a23e5ec..3fbc6a0e29a 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -321,6 +321,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { updatedAt: "2026-02-24T00:00:05.000Z", }, ], + queuedTurns: [], proposedPlans: [ { id: "plan-1", diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 9b3c0fa7ad4..2821bed2172 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -8,9 +8,12 @@ import { OrchestrationProposedPlanId, OrchestrationReadModel, OrchestrationShellSnapshot, + OrchestrationQueuedTurn, + OrchestrationQueuedTurnStatus, OrchestrationThread, ProjectScript, TurnId, + TurnQueueItemId, type OrchestrationCheckpointSummary, type OrchestrationLatestTurn, type OrchestrationMessage, @@ -23,6 +26,7 @@ import { ModelSelection, ProjectId, ThreadId, + ThreadQueuedTurnRequest, } from "@t3tools/contracts"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; @@ -84,6 +88,15 @@ const ProjectionThreadActivityDbRowSchema = ProjectionThreadActivity.mapFields( }), ); const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession; +const ProjectionQueuedTurnDbRowSchema = Schema.Struct({ + queueItemId: TurnQueueItemId, + threadId: ThreadId, + request: Schema.fromJsonString(ThreadQueuedTurnRequest), + status: OrchestrationQueuedTurnStatus, + failureReason: Schema.NullOr(Schema.String), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, +}); const ProjectionCheckpointDbRowSchema = ProjectionCheckpoint.mapFields( Struct.assign({ files: Schema.fromJsonString(Schema.Array(OrchestrationCheckpointFile)), @@ -142,6 +155,7 @@ const REQUIRED_SNAPSHOT_PROJECTORS = [ ORCHESTRATION_PROJECTOR_NAMES.threads, ORCHESTRATION_PROJECTOR_NAMES.threadMessages, ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans, + ORCHESTRATION_PROJECTOR_NAMES.queuedTurns, ORCHESTRATION_PROJECTOR_NAMES.threadActivities, ORCHESTRATION_PROJECTOR_NAMES.threadSessions, ORCHESTRATION_PROJECTOR_NAMES.checkpoints, @@ -221,6 +235,19 @@ function mapSessionRow( }; } +function mapQueuedTurnRow( + row: Schema.Schema.Type, +): OrchestrationQueuedTurn { + return { + queueItemId: row.queueItemId, + request: row.request, + status: row.status, + failureReason: row.failureReason, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }; +} + function mapProjectShellRow( row: Schema.Schema.Type, repositoryIdentity: OrchestrationProject["repositoryIdentity"], @@ -440,6 +467,24 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listQueuedTurnRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionQueuedTurnDbRowSchema, + execute: () => + sql` + SELECT + queue.queue_item_id AS "queueItemId", + queue.thread_id AS "threadId", + queue.request_json AS "request", + queue.status AS status, + queue.failure_reason AS "failureReason", + queue.created_at AS "createdAt", + queue.updated_at AS "updatedAt" + FROM projection_queued_turns AS queue + ORDER BY queue.thread_id ASC, queue.created_at ASC, queue.queue_item_id ASC + `, + }); + const listThreadActivityRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadActivityDbRowSchema, @@ -805,6 +850,25 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listQueuedTurnRowsByThread = SqlSchema.findAll({ + Request: ThreadIdLookupInput, + Result: ProjectionQueuedTurnDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + queue.queue_item_id AS "queueItemId", + queue.thread_id AS "threadId", + queue.request_json AS "request", + queue.status AS status, + queue.failure_reason AS "failureReason", + queue.created_at AS "createdAt", + queue.updated_at AS "updatedAt" + FROM projection_queued_turns AS queue + WHERE queue.thread_id = ${threadId} + ORDER BY queue.created_at ASC, queue.queue_item_id ASC + `, + }); + const listThreadActivityRowsByThread = SqlSchema.findAll({ Request: ThreadIdLookupInput, Result: ProjectionThreadActivityDbRowSchema, @@ -964,6 +1028,14 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ), ), + listQueuedTurnRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listQueuedTurns:query", + "ProjectionSnapshotQuery.getSnapshot:listQueuedTurns:decodeRows", + ), + ), + ), listThreadActivityRows(undefined).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( @@ -1013,6 +1085,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { threadRows, messageRows, proposedPlanRows, + queuedTurnRows, activityRows, sessionRows, checkpointRows, @@ -1022,6 +1095,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { Effect.gen(function* () { const messagesByThread = new Map>(); const proposedPlansByThread = new Map>(); + const queuedTurnsByThread = new Map>(); const activitiesByThread = new Map>(); const checkpointsByThread = new Map>(); const sessionsByThread = new Map(); @@ -1070,6 +1144,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { proposedPlansByThread.set(row.threadId, threadProposedPlans); } + for (const row of queuedTurnRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadQueuedTurns = queuedTurnsByThread.get(row.threadId) ?? []; + threadQueuedTurns.push(mapQueuedTurnRow(row)); + queuedTurnsByThread.set(row.threadId, threadQueuedTurns); + } + for (const row of activityRows) { updatedAt = maxIso(updatedAt, row.createdAt); const threadActivities = activitiesByThread.get(row.threadId) ?? []; @@ -1185,6 +1266,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { archivedAt: row.archivedAt, deletedAt: row.deletedAt, messages: messagesByThread.get(row.threadId) ?? [], + queuedTurns: queuedTurnsByThread.get(row.threadId) ?? [], proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], activities: activitiesByThread.get(row.threadId) ?? [], checkpoints: checkpointsByThread.get(row.threadId) ?? [], @@ -1241,6 +1323,14 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ), ), + listQueuedTurnRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getCommandReadModel:listQueuedTurns:query", + "ProjectionSnapshotQuery.getCommandReadModel:listQueuedTurns:decodeRows", + ), + ), + ), listThreadSessionRows(undefined).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( @@ -1269,7 +1359,15 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ) .pipe( Effect.flatMap( - ([projectRows, threadRows, proposedPlanRows, sessionRows, latestTurnRows, stateRows]) => + ([ + projectRows, + threadRows, + proposedPlanRows, + queuedTurnRows, + sessionRows, + latestTurnRows, + stateRows, + ]) => Effect.sync(() => { let updatedAt: string | null = null; const projects: OrchestrationProject[] = []; @@ -1306,6 +1404,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { } updatedAt = maxIso(updatedAt, row.updatedAt); } + for (let index = 0; index < queuedTurnRows.length; index += 1) { + const row = queuedTurnRows[index]; + if (!row) { + continue; + } + updatedAt = maxIso(updatedAt, row.updatedAt); + } for (let index = 0; index < sessionRows.length; index += 1) { const row = sessionRows[index]; if (!row) { @@ -1343,6 +1448,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { latestTurnByThread.set(row.threadId, mapLatestTurn(row)); } const proposedPlansByThread = new Map>(); + const queuedTurnsByThread = new Map>(); const sessionByThread = new Map(); for (let index = 0; index < sessionRows.length; index += 1) { @@ -1363,6 +1469,16 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { proposedPlansByThread.set(row.threadId, threadProposedPlans); } + for (let index = 0; index < queuedTurnRows.length; index += 1) { + const row = queuedTurnRows[index]; + if (!row) { + continue; + } + const threadQueuedTurns = queuedTurnsByThread.get(row.threadId) ?? []; + threadQueuedTurns.push(mapQueuedTurnRow(row)); + queuedTurnsByThread.set(row.threadId, threadQueuedTurns); + } + for (let index = 0; index < threadRows.length; index += 1) { const row = threadRows[index]; if (!row) { @@ -1383,6 +1499,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { archivedAt: row.archivedAt, deletedAt: row.deletedAt, messages: [], + queuedTurns: queuedTurnsByThread.get(row.threadId) ?? [], proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], activities: [], checkpoints: [], @@ -1894,6 +2011,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { threadRow, messageRows, proposedPlanRows, + queuedTurnRows, activityRows, checkpointRows, latestTurnRow, @@ -1923,6 +2041,14 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ), ), + listQueuedTurnRowsByThread({ threadId }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getThreadDetailById:listQueuedTurns:query", + "ProjectionSnapshotQuery.getThreadDetailById:listQueuedTurns:decodeRows", + ), + ), + ), listThreadActivityRowsByThread({ threadId }).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( @@ -1990,6 +2116,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { } return message; }), + queuedTurns: queuedTurnRows.map(mapQueuedTurnRow), proposedPlans: proposedPlanRows.map(mapProposedPlanRow), activities: activityRows.map((row) => { const activity = { diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 571164fad93..8a0d2eb5b87 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -69,7 +69,7 @@ const deriveServerPathsSync = (baseDir: string, devUrl: URL | undefined) => async function waitFor( predicate: () => boolean | Promise, - timeoutMs = 2000, + timeoutMs = 5000, ): Promise { const deadline = (await Effect.runPromise(Clock.currentTimeMillis)) + timeoutMs; const poll = async (): Promise => { @@ -453,6 +453,88 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.runtimeMode).toBe("approval-required"); }); + it("reacts to queued turn send start by sending the queued user message", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + const threadId = ThreadId.make("thread-1"); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.queue", + commandId: CommandId.make("cmd-queued-turn-start"), + threadId, + message: { + messageId: asMessageId("queued-user-message-1"), + role: "user", + text: "queued provider message", + attachments: [], + }, + createdAt: now, + }), + ); + + let readModel = await harness.readModel(); + const queuedThread = readModel.threads.find((entry) => entry.id === threadId); + const queuedTurn = queuedThread?.queuedTurns[0]; + expect(queuedTurn).toMatchObject({ + status: "pending", + request: { + message: { + messageId: asMessageId("queued-user-message-1"), + text: "queued provider message", + }, + }, + }); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make("cmd-queued-session-ready"), + threadId, + session: { + threadId, + providerName: "codex", + status: "ready", + runtimeMode: "approval-required", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.queued-turn.send.start", + commandId: CommandId.make("cmd-queued-send-start"), + threadId, + mode: "normal", + createdAt: now, + }), + ); + + await waitFor(() => harness.sendTurn.mock.calls.length === 1); + expect(harness.sendTurn.mock.calls[0]?.[0]).toMatchObject({ + threadId, + input: "queued provider message", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + }); + + await waitFor(async () => { + const current = await harness.readModel(); + const thread = current.threads.find((entry) => entry.id === threadId); + return thread?.queuedTurns[0]?.status === "sending"; + }); + + readModel = await harness.readModel(); + const sendingThread = readModel.threads.find((entry) => entry.id === threadId); + expect(sendingThread?.queuedTurns[0]).toMatchObject({ + queueItemId: queuedTurn?.queueItemId, + status: "sending", + }); + }); + it("generates a thread title on the first turn", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 8b71a976808..b209add68cc 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -20,6 +20,7 @@ import * as Effect from "effect/Effect"; import * as Equal from "effect/Equal"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; +import * as Random from "effect/Random"; import * as Schema from "effect/Schema"; import * as Stream from "effect/Stream"; import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; @@ -83,6 +84,8 @@ const turnStartKeyForEvent = (event: ProviderIntentEvent): string => const serverCommandId = (tag: string): CommandId => CommandId.make(`server:${tag}:${crypto.randomUUID()}`); +const effectServerCommandId = (tag: string) => + Effect.map(Random.nextUUIDv4, (id) => CommandId.make(`server:${tag}:${id}`)); const HANDLED_TURN_START_KEY_MAX = 10_000; const HANDLED_TURN_START_KEY_TTL = Duration.minutes(30); @@ -686,18 +689,63 @@ const make = Effect.gen(function* () { return; } - const message = thread.messages.find((entry) => entry.id === event.payload.messageId); - if (!message || message.role !== "user") { + const dispatchQueuedTurnFailure = (detail: string, tag: string) => { + const queueItemId = event.payload.queueItemId; + if (queueItemId === undefined) { + return Effect.void; + } + return effectServerCommandId(tag).pipe( + Effect.flatMap((commandId) => + orchestrationEngine.dispatch({ + type: "thread.queued-turn.send.fail", + commandId, + threadId: event.payload.threadId, + queueItemId, + reason: detail, + createdAt: event.payload.createdAt, + }), + ), + ); + }; + + const queuedRequest = + event.payload.queuedRequest ?? + (event.payload.queueItemId === undefined + ? undefined + : thread.queuedTurns.find((entry) => entry.queueItemId === event.payload.queueItemId) + ?.request); + if (event.payload.queueItemId !== undefined && queuedRequest === undefined) { + const detail = `Queued turn '${event.payload.queueItemId}' was not found for turn start request.`; + yield* appendProviderFailureActivity({ + threadId: event.payload.threadId, + kind: "provider.turn.start.failed", + summary: "Provider turn start failed", + detail, + turnId: null, + createdAt: event.payload.createdAt, + }); + yield* dispatchQueuedTurnFailure(detail, "queued-turn-send-missing-request"); + return; + } + const message = + queuedRequest?.message ?? + thread.messages.find( + (entry) => entry.id === event.payload.messageId && entry.role === "user", + ); + if (!message) { + const detail = `User message '${event.payload.messageId}' was not found for turn start request.`; yield* appendProviderFailureActivity({ threadId: event.payload.threadId, kind: "provider.turn.start.failed", summary: "Provider turn start failed", - detail: `User message '${event.payload.messageId}' was not found for turn start request.`, + detail, turnId: null, createdAt: event.payload.createdAt, }); + yield* dispatchQueuedTurnFailure(detail, "queued-turn-send-missing-message"); return; } + const messageAttachments = message.attachments ?? []; const isFirstUserMessageTurn = thread.messages.filter((entry) => entry.role === "user").length === 1; @@ -710,7 +758,7 @@ const make = Effect.gen(function* () { }) ?? process.cwd(); const generationInput = { messageText: message.text, - ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), + ...(messageAttachments.length > 0 ? { attachments: messageAttachments } : {}), ...(event.payload.titleSeed !== undefined ? { titleSeed: event.payload.titleSeed } : {}), }; @@ -750,6 +798,7 @@ const make = Effect.gen(function* () { createdAt: event.payload.createdAt, }), ), + Effect.flatMap(() => dispatchQueuedTurnFailure(detail, "queued-turn-send-failed")), Effect.asVoid, ); }; @@ -769,7 +818,7 @@ const make = Effect.gen(function* () { const sendTurnRequest = yield* buildSendTurnRequestForThread({ threadId: event.payload.threadId, messageText: message.text, - ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), + ...(messageAttachments.length > 0 ? { attachments: messageAttachments } : {}), ...(event.payload.modelSelection !== undefined ? { modelSelection: event.payload.modelSelection } : {}), diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 3b2411cba2a..68031c9c353 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -170,7 +170,7 @@ type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][nu async function waitForThread( readModel: () => Promise, predicate: (thread: ProviderRuntimeTestThread) => boolean, - timeoutMs = 2000, + timeoutMs = 5000, threadId: ThreadId = asThreadId("thread-1"), ) { const deadline = (await Effect.runPromise(Clock.currentTimeMillis)) + timeoutMs; @@ -505,6 +505,90 @@ describe("ProviderRuntimeIngestion", () => { ); }); + it("resolves a queued turn only after the runtime confirms turn.started", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + const threadId = asThreadId("thread-1"); + const turnId = asTurnId("turn-queued-resolved"); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.queue", + commandId: CommandId.make("cmd-turn-start-queued-runtime"), + threadId, + message: { + messageId: asMessageId("msg-queued-runtime"), + role: "user", + text: "queued runtime turn", + attachments: [], + }, + createdAt: now, + }), + ); + + const queuedThread = await waitForThread( + harness.readModel, + (thread) => thread.queuedTurns.length === 1 && thread.queuedTurns[0]?.status === "pending", + 2_000, + threadId, + ); + expect(queuedThread.queuedTurns[0]).toMatchObject({ + request: { + message: { + messageId: "msg-queued-runtime", + }, + }, + status: "pending", + }); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.queued-turn.send.start", + commandId: CommandId.make("cmd-queued-send-start-runtime"), + threadId, + mode: "normal", + createdAt: now, + }), + ); + + await waitForThread( + harness.readModel, + (thread) => thread.queuedTurns[0]?.status === "sending", + 2_000, + threadId, + ); + + harness.setProviderSession({ + provider: ProviderDriverKind.make("codex"), + status: "running", + runtimeMode: "approval-required", + threadId, + createdAt: now, + updatedAt: now, + activeTurnId: turnId, + }); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-queued-runtime"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId, + turnId, + }); + + const thread = await waitForThread( + harness.readModel, + (entry) => + entry.queuedTurns.length === 0 && + entry.session?.status === "running" && + entry.session?.activeTurnId === turnId, + 2_000, + threadId, + ); + expect(thread.queuedTurns).toEqual([]); + }); + it("accepts claude turn lifecycle when seeded thread id is a synthetic placeholder", async () => { const harness = await createHarness(); const seededAt = "2026-01-01T00:00:00.000Z"; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 2c07ac91b1e..43b1428b4bb 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1101,28 +1101,6 @@ const make = Effect.gen(function* () { ).pipe(Effect.asVoid); }); - const getSourceProposedPlanReferenceForPendingTurnStart = Effect.fn( - "getSourceProposedPlanReferenceForPendingTurnStart", - )(function* (threadId: ThreadId) { - const pendingTurnStart = yield* projectionTurnRepository.getPendingTurnStartByThreadId({ - threadId, - }); - if (Option.isNone(pendingTurnStart)) { - return null; - } - - const sourceThreadId = pendingTurnStart.value.sourceProposedPlanThreadId; - const sourcePlanId = pendingTurnStart.value.sourceProposedPlanId; - if (sourceThreadId === null || sourcePlanId === null) { - return null; - } - - return { - sourceThreadId, - sourcePlanId, - } as const; - }); - const getExpectedProviderTurnIdForThread = Effect.fn("getExpectedProviderTurnIdForThread")( function* (threadId: ThreadId) { const sessions = yield* providerService.listSessions(); @@ -1131,8 +1109,8 @@ const make = Effect.gen(function* () { }, ); - const getSourceProposedPlanReferenceForAcceptedTurnStart = Effect.fn( - "getSourceProposedPlanReferenceForAcceptedTurnStart", + const getPendingTurnStartForAcceptedTurnStart = Effect.fn( + "getPendingTurnStartForAcceptedTurnStart", )(function* (threadId: ThreadId, eventTurnId: TurnId | undefined) { if (eventTurnId === undefined) { return null; @@ -1143,7 +1121,13 @@ const make = Effect.gen(function* () { return null; } - return yield* getSourceProposedPlanReferenceForPendingTurnStart(threadId); + const pendingTurnStart = yield* projectionTurnRepository.getPendingTurnStartByThreadId({ + threadId, + }); + return Option.match(pendingTurnStart, { + onNone: () => null, + onSome: (value) => value, + }); }); const markSourceProposedPlanImplemented = Effect.fn("markSourceProposedPlanImplemented")( @@ -1225,9 +1209,9 @@ const make = Effect.gen(function* () { return true; } })(); - const acceptedTurnStartedSourcePlan = + const acceptedPendingTurnStart = event.type === "turn.started" && shouldApplyThreadLifecycle - ? yield* getSourceProposedPlanReferenceForAcceptedTurnStart(thread.id, eventTurnId) + ? yield* getPendingTurnStartForAcceptedTurnStart(thread.id, eventTurnId) : null; if ( @@ -1274,10 +1258,31 @@ const make = Effect.gen(function* () { : (thread.session?.lastError ?? null); if (shouldApplyThreadLifecycle) { - if (event.type === "turn.started" && acceptedTurnStartedSourcePlan !== null) { + if ( + event.type === "turn.started" && + acceptedPendingTurnStart !== null && + acceptedPendingTurnStart.queueItemId !== null && + eventTurnId !== undefined + ) { + yield* orchestrationEngine.dispatch({ + type: "thread.queued-turn.resolve", + commandId: providerCommandId(event, "queued-turn-resolve"), + threadId: thread.id, + queueItemId: acceptedPendingTurnStart.queueItemId, + turnId: eventTurnId, + createdAt: now, + }); + } + + if ( + event.type === "turn.started" && + acceptedPendingTurnStart !== null && + acceptedPendingTurnStart.sourceProposedPlanThreadId !== null && + acceptedPendingTurnStart.sourceProposedPlanId !== null + ) { yield* markSourceProposedPlanImplemented( - acceptedTurnStartedSourcePlan.sourceThreadId, - acceptedTurnStartedSourcePlan.sourcePlanId, + acceptedPendingTurnStart.sourceProposedPlanThreadId, + acceptedPendingTurnStart.sourceProposedPlanId, thread.id, now, ).pipe( diff --git a/apps/server/src/orchestration/Layers/QueuedTurnDrainReactor.test.ts b/apps/server/src/orchestration/Layers/QueuedTurnDrainReactor.test.ts new file mode 100644 index 00000000000..166a4fdf50e --- /dev/null +++ b/apps/server/src/orchestration/Layers/QueuedTurnDrainReactor.test.ts @@ -0,0 +1,365 @@ +import { + CommandId, + EventId, + MessageId, + ProjectId, + ProviderInstanceId, + ThreadId, + TurnQueueItemId, + type OrchestrationCommand, + type OrchestrationEvent, + type OrchestrationReadModel, +} from "@t3tools/contracts"; +import * as Effect from "effect/Effect"; +import * as Exit from "effect/Exit"; +import * as Layer from "effect/Layer"; +import * as ManagedRuntime from "effect/ManagedRuntime"; +import * as PubSub from "effect/PubSub"; +import * as Scope from "effect/Scope"; +import * as Stream from "effect/Stream"; +import { describe, expect, it } from "vitest"; + +import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; +import { QueuedTurnDrainReactor } from "../Services/QueuedTurnDrainReactor.ts"; +import { QueuedTurnDrainReactorLive } from "./QueuedTurnDrainReactor.ts"; + +const now = "2026-01-01T00:00:00.000Z"; +const threadId = ThreadId.make("thread-1"); + +function makeSnapshot(input: { + readonly sessionStatus: "ready" | "running"; + readonly queuedStatus?: "pending" | "sending"; + readonly queuedStatuses?: ReadonlyArray<"pending" | "sending">; +}): OrchestrationReadModel { + const queuedStatuses = input.queuedStatuses ?? [input.queuedStatus ?? "pending"]; + return { + snapshotSequence: 1, + projects: [ + { + id: ProjectId.make("project-1"), + title: "Project", + workspaceRoot: "/repo/project", + defaultModelSelection: null, + scripts: [], + createdAt: now, + updatedAt: now, + deletedAt: null, + }, + ], + threads: [ + { + id: threadId, + projectId: ProjectId.make("project-1"), + title: "Thread", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + branch: null, + worktreePath: null, + latestTurn: null, + createdAt: now, + updatedAt: now, + archivedAt: null, + deletedAt: null, + messages: [], + queuedTurns: queuedStatuses.map((status, index) => ({ + queueItemId: TurnQueueItemId.make(`queue-item-${index + 1}`), + request: { + message: { + messageId: MessageId.make(`message-${index + 1}`), + role: "user", + text: `queued message ${index + 1}`, + attachments: [], + }, + }, + status, + failureReason: null, + createdAt: now, + updatedAt: now, + })), + proposedPlans: [], + activities: [], + checkpoints: [], + session: { + threadId, + status: input.sessionStatus, + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + }, + ], + updatedAt: now, + }; +} + +function makeLayer( + snapshot: OrchestrationReadModel, + dispatched: OrchestrationCommand[], + streamDomainEvents: Stream.Stream = Stream.empty, +) { + return QueuedTurnDrainReactorLive.pipe( + Layer.provideMerge( + Layer.succeed(OrchestrationEngineService, { + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents, + }), + ), + Layer.provideMerge( + Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => Effect.succeed(snapshot), + getSnapshot: () => Effect.die("getSnapshot should not be called"), + getShellSnapshot: () => Effect.die("getShellSnapshot should not be called"), + getArchivedShellSnapshot: () => Effect.die("getArchivedShellSnapshot should not be called"), + getActiveProjectByWorkspaceRoot: () => + Effect.die("getActiveProjectByWorkspaceRoot should not be called"), + getProjectShellById: () => Effect.die("getProjectShellById should not be called"), + getFirstActiveThreadIdByProjectId: () => + Effect.die("getFirstActiveThreadIdByProjectId should not be called"), + getThreadDetailById: () => Effect.die("getThreadDetailById should not be called"), + getThreadCheckpointContext: () => + Effect.die("getThreadCheckpointContext should not be called"), + getFullThreadDiffContext: () => Effect.die("getFullThreadDiffContext should not be called"), + getThreadShellById: () => Effect.die("getThreadShellById should not be called"), + getCounts: () => Effect.die("getCounts should not be called"), + getSnapshotSequence: () => Effect.die("getSnapshotSequence should not be called"), + }), + ), + ); +} + +describe("QueuedTurnDrainReactor", () => { + it("claims pending queued turns when a thread is ready", async () => { + const dispatched: OrchestrationCommand[] = []; + const runtime = ManagedRuntime.make( + makeLayer(makeSnapshot({ sessionStatus: "ready", queuedStatus: "pending" }), dispatched), + ); + const scope = await Effect.runPromise(Scope.make("sequential")); + try { + const reactor = await runtime.runPromise(Effect.service(QueuedTurnDrainReactor)); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + expect(dispatched).toHaveLength(1); + expect(dispatched[0]).toMatchObject({ + type: "thread.queued-turn.send.start", + threadId, + mode: "normal", + }); + } finally { + await Effect.runPromise(Scope.close(scope, Exit.void)); + await runtime.dispose(); + } + }); + + it("does not claim queued turns while the thread is running", async () => { + const dispatched: OrchestrationCommand[] = []; + const runtime = ManagedRuntime.make( + makeLayer(makeSnapshot({ sessionStatus: "running", queuedStatus: "pending" }), dispatched), + ); + const scope = await Effect.runPromise(Scope.make("sequential")); + try { + const reactor = await runtime.runPromise(Effect.service(QueuedTurnDrainReactor)); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + expect(dispatched).toHaveLength(0); + } finally { + await Effect.runPromise(Scope.close(scope, Exit.void)); + await runtime.dispose(); + } + }); + + it("recovers sending queued turns on startup", async () => { + const dispatched: OrchestrationCommand[] = []; + const runtime = ManagedRuntime.make( + makeLayer(makeSnapshot({ sessionStatus: "ready", queuedStatus: "sending" }), dispatched), + ); + const scope = await Effect.runPromise(Scope.make("sequential")); + try { + const reactor = await runtime.runPromise(Effect.service(QueuedTurnDrainReactor)); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + expect(dispatched[0]).toMatchObject({ + type: "thread.queued-turn.send.start", + mode: "recover", + }); + } finally { + await Effect.runPromise(Scope.close(scope, Exit.void)); + await runtime.dispose(); + } + }); + + it("does not normal-drain a thread after recovering a sending queued turn on startup", async () => { + const dispatched: OrchestrationCommand[] = []; + const runtime = ManagedRuntime.make( + makeLayer( + makeSnapshot({ + sessionStatus: "ready", + queuedStatuses: ["sending", "pending"], + }), + dispatched, + ), + ); + const scope = await Effect.runPromise(Scope.make("sequential")); + try { + const reactor = await runtime.runPromise(Effect.service(QueuedTurnDrainReactor)); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + expect(dispatched).toHaveLength(1); + expect(dispatched[0]).toMatchObject({ + type: "thread.queued-turn.send.start", + mode: "recover", + }); + } finally { + await Effect.runPromise(Scope.close(scope, Exit.void)); + await runtime.dispose(); + } + }); + + it("does not claim pending queued turns while another queued turn is sending", async () => { + const dispatched: OrchestrationCommand[] = []; + const sessionSetEvent: OrchestrationEvent = { + sequence: 1, + eventId: EventId.make("evt-session-set"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.session-set", + occurredAt: now, + commandId: CommandId.make("cmd-session-set"), + causationEventId: null, + correlationId: CommandId.make("cmd-session-set"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "ready", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + }, + }; + const runtime = ManagedRuntime.make( + makeLayer( + makeSnapshot({ + sessionStatus: "ready", + queuedStatuses: ["sending", "pending"], + }), + dispatched, + Stream.make(sessionSetEvent), + ), + ); + const scope = await Effect.runPromise(Scope.make("sequential")); + try { + const reactor = await runtime.runPromise(Effect.service(QueuedTurnDrainReactor)); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + await Effect.runPromise(Effect.yieldNow); + await Effect.runPromise(reactor.drain); + expect(dispatched).toHaveLength(1); + expect(dispatched[0]).toMatchObject({ + type: "thread.queued-turn.send.start", + mode: "recover", + }); + } finally { + await Effect.runPromise(Scope.close(scope, Exit.void)); + await runtime.dispose(); + } + }); + + it("does not miss a drain trigger emitted during startup scan", async () => { + const dispatched: OrchestrationCommand[] = []; + const sessionSetEvent: OrchestrationEvent = { + sequence: 1, + eventId: EventId.make("evt-session-set-during-startup"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.session-set", + occurredAt: now, + commandId: CommandId.make("cmd-session-set-during-startup"), + causationEventId: null, + correlationId: CommandId.make("cmd-session-set-during-startup"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "ready", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + }, + }; + const pubSub = Effect.runSync(PubSub.unbounded()); + let readCount = 0; + const layer = QueuedTurnDrainReactorLive.pipe( + Layer.provideMerge( + Layer.succeed(OrchestrationEngineService, { + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.fromPubSub(pubSub), + }), + ), + Layer.provideMerge( + Layer.succeed(ProjectionSnapshotQuery, { + getCommandReadModel: () => + Effect.gen(function* () { + readCount += 1; + if (readCount === 1) { + yield* PubSub.publish(pubSub, sessionSetEvent); + return makeSnapshot({ sessionStatus: "ready", queuedStatuses: [] }); + } + return makeSnapshot({ sessionStatus: "ready", queuedStatus: "pending" }); + }), + getSnapshot: () => Effect.die("getSnapshot should not be called"), + getShellSnapshot: () => Effect.die("getShellSnapshot should not be called"), + getArchivedShellSnapshot: () => + Effect.die("getArchivedShellSnapshot should not be called"), + getActiveProjectByWorkspaceRoot: () => + Effect.die("getActiveProjectByWorkspaceRoot should not be called"), + getProjectShellById: () => Effect.die("getProjectShellById should not be called"), + getFirstActiveThreadIdByProjectId: () => + Effect.die("getFirstActiveThreadIdByProjectId should not be called"), + getThreadDetailById: () => Effect.die("getThreadDetailById should not be called"), + getThreadCheckpointContext: () => + Effect.die("getThreadCheckpointContext should not be called"), + getFullThreadDiffContext: () => + Effect.die("getFullThreadDiffContext should not be called"), + getThreadShellById: () => Effect.die("getThreadShellById should not be called"), + getCounts: () => Effect.die("getCounts should not be called"), + getSnapshotSequence: () => Effect.die("getSnapshotSequence should not be called"), + }), + ), + ); + const runtime = ManagedRuntime.make(layer); + const scope = await Effect.runPromise(Scope.make("sequential")); + try { + const reactor = await runtime.runPromise(Effect.service(QueuedTurnDrainReactor)); + await Effect.runPromise(reactor.start().pipe(Scope.provide(scope))); + await Effect.runPromise(reactor.drain); + expect(dispatched).toHaveLength(1); + expect(dispatched[0]).toMatchObject({ + type: "thread.queued-turn.send.start", + mode: "normal", + }); + } finally { + await Effect.runPromise(Scope.close(scope, Exit.void)); + await runtime.dispose(); + } + }); +}); diff --git a/apps/server/src/orchestration/Layers/QueuedTurnDrainReactor.ts b/apps/server/src/orchestration/Layers/QueuedTurnDrainReactor.ts new file mode 100644 index 00000000000..5c43e712a3d --- /dev/null +++ b/apps/server/src/orchestration/Layers/QueuedTurnDrainReactor.ts @@ -0,0 +1,168 @@ +import { + CommandId, + type OrchestrationEvent, + type OrchestrationThread, + type OrchestrationQueuedTurn, + type ThreadId, +} from "@t3tools/contracts"; +import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; +import * as Cause from "effect/Cause"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Random from "effect/Random"; +import * as Stream from "effect/Stream"; + +import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; +import { + QueuedTurnDrainReactor, + type QueuedTurnDrainReactorShape, +} from "../Services/QueuedTurnDrainReactor.ts"; + +type QueueDrainTriggerEvent = Extract< + OrchestrationEvent, + { + type: "thread.turn-queued" | "thread.session-set" | "thread.queued-turn-requeued"; + } +>; + +const nowIso = Effect.map(DateTime.now, DateTime.formatIso); +const serverCommandId = (tag: string) => + Effect.map(Random.nextUUIDv4, (id) => CommandId.make(`server:${tag}:${id}`)); + +type DrainMode = "normal" | "recover"; + +function getQueuedTurnDrainMode(thread: OrchestrationThread): DrainMode | null { + if (thread.session?.status !== "ready") { + return null; + } + if (thread.queuedTurns.some((entry: OrchestrationQueuedTurn) => entry.status === "sending")) { + return "recover"; + } + if (thread.queuedTurns.some((entry: OrchestrationQueuedTurn) => entry.status === "pending")) { + return "normal"; + } + return null; +} + +const canDrainWithMode = (thread: OrchestrationThread, mode: DrainMode) => + getQueuedTurnDrainMode(thread) === mode; + +const make = Effect.fn("makeQueuedTurnDrainReactor")(function* () { + const orchestrationEngine = yield* OrchestrationEngineService; + const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; + + const dispatchDrainIfReady = Effect.fn("dispatchDrainIfReady")(function* ( + threadId: ThreadId, + mode: DrainMode, + ) { + const snapshot = yield* projectionSnapshotQuery.getCommandReadModel(); + const thread = snapshot.threads.find((entry) => entry.id === threadId); + if (!thread) { + return; + } + if (!canDrainWithMode(thread, mode)) { + return; + } + + yield* orchestrationEngine + .dispatch({ + type: "thread.queued-turn.send.start", + commandId: yield* serverCommandId("queued-turn-send-start"), + threadId, + mode, + createdAt: yield* nowIso, + }) + .pipe( + Effect.catchCause((cause) => { + if (Cause.hasInterruptsOnly(cause)) { + return Effect.failCause(cause); + } + return Effect.logWarning("queued turn drain failed to claim next turn", { + threadId, + cause: Cause.pretty(cause), + }); + }), + ); + }); + + const processEvent = Effect.fn("processEvent")(function* (event: QueueDrainTriggerEvent) { + yield* dispatchDrainIfReady(event.payload.threadId, "normal"); + }); + + const processEventSafely = (event: QueueDrainTriggerEvent) => + processEvent(event).pipe( + Effect.catchCause((cause) => { + if (Cause.hasInterruptsOnly(cause)) { + return Effect.failCause(cause); + } + return Effect.logWarning("queued turn drain reactor failed to process event", { + eventType: event.type, + threadId: event.payload.threadId, + cause: Cause.pretty(cause), + }); + }), + ); + + const worker = yield* makeDrainableWorker(processEventSafely); + + const enqueueInitialDrainAttempts = Effect.fn("enqueueInitialDrainAttempts")(function* () { + const snapshot = yield* projectionSnapshotQuery.getCommandReadModel(); + const recoverThreadIds = new Set( + snapshot.threads + .filter((thread) => getQueuedTurnDrainMode(thread) === "recover") + .map((thread) => thread.id), + ); + const normalThreadIds = snapshot.threads + .filter((thread) => getQueuedTurnDrainMode(thread) === "normal") + .map((thread) => thread.id); + + yield* Effect.forEach( + recoverThreadIds, + (threadId) => dispatchDrainIfReady(threadId, "recover"), + { + concurrency: 1, + }, + ); + yield* Effect.forEach(normalThreadIds, (threadId) => dispatchDrainIfReady(threadId, "normal"), { + concurrency: 1, + }); + }); + + const start: QueuedTurnDrainReactorShape["start"] = Effect.fn("start")(function* () { + yield* Effect.forkScoped( + Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => { + if ( + event.type === "thread.turn-queued" || + event.type === "thread.session-set" || + event.type === "thread.queued-turn-requeued" + ) { + return worker.enqueue(event); + } + return Effect.void; + }).pipe( + Effect.catchCause((cause) => + Effect.logWarning("queued turn drain reactor stream failed", { + cause: Cause.pretty(cause), + }), + ), + ), + ); + yield* Effect.yieldNow; + yield* enqueueInitialDrainAttempts().pipe( + Effect.catchCause((cause) => + Effect.logWarning("queued turn drain reactor failed initial drain scan", { + cause: Cause.pretty(cause), + }), + ), + ); + }); + + return { + start, + drain: worker.drain, + } satisfies QueuedTurnDrainReactorShape; +}); + +export const QueuedTurnDrainReactorLive = Layer.effect(QueuedTurnDrainReactor, make()); diff --git a/apps/server/src/orchestration/Normalizer.ts b/apps/server/src/orchestration/Normalizer.ts index 95d29e3d6d2..e0e03519236 100644 --- a/apps/server/src/orchestration/Normalizer.ts +++ b/apps/server/src/orchestration/Normalizer.ts @@ -65,7 +65,7 @@ export const normalizeDispatchCommand = (command: ClientOrchestrationCommand) => } satisfies OrchestrationCommand; } - if (command.type !== "thread.turn.start") { + if (command.type !== "thread.turn.start" && command.type !== "thread.turn.queue") { return command as OrchestrationCommand; } diff --git a/apps/server/src/orchestration/Schemas.ts b/apps/server/src/orchestration/Schemas.ts index f7ebf693440..3afa23cd33f 100644 --- a/apps/server/src/orchestration/Schemas.ts +++ b/apps/server/src/orchestration/Schemas.ts @@ -16,6 +16,11 @@ import { ThreadRevertedPayload as ContractsThreadRevertedPayloadSchema, ThreadActivityAppendedPayload as ContractsThreadActivityAppendedPayloadSchema, ThreadTurnStartRequestedPayload as ContractsThreadTurnStartRequestedPayloadSchema, + ThreadTurnQueuedPayload as ContractsThreadTurnQueuedPayloadSchema, + ThreadQueuedTurnSendFailedPayload as ContractsThreadQueuedTurnSendFailedPayloadSchema, + ThreadQueuedTurnRequeuedPayload as ContractsThreadQueuedTurnRequeuedPayloadSchema, + ThreadQueuedTurnResolvedPayload as ContractsThreadQueuedTurnResolvedPayloadSchema, + ThreadQueuedTurnSendStartedPayload as ContractsThreadQueuedTurnSendStartedPayloadSchema, ThreadTurnInterruptRequestedPayload as ContractsThreadTurnInterruptRequestedPayloadSchema, ThreadApprovalResponseRequestedPayload as ContractsThreadApprovalResponseRequestedPayloadSchema, ThreadCheckpointRevertRequestedPayload as ContractsThreadCheckpointRevertRequestedPayloadSchema, @@ -43,6 +48,11 @@ export const ThreadRevertedPayload = ContractsThreadRevertedPayloadSchema; export const ThreadActivityAppendedPayload = ContractsThreadActivityAppendedPayloadSchema; export const ThreadTurnStartRequestedPayload = ContractsThreadTurnStartRequestedPayloadSchema; +export const ThreadTurnQueuedPayload = ContractsThreadTurnQueuedPayloadSchema; +export const ThreadQueuedTurnSendStartedPayload = ContractsThreadQueuedTurnSendStartedPayloadSchema; +export const ThreadQueuedTurnResolvedPayload = ContractsThreadQueuedTurnResolvedPayloadSchema; +export const ThreadQueuedTurnRequeuedPayload = ContractsThreadQueuedTurnRequeuedPayloadSchema; +export const ThreadQueuedTurnSendFailedPayload = ContractsThreadQueuedTurnSendFailedPayloadSchema; export const ThreadTurnInterruptRequestedPayload = ContractsThreadTurnInterruptRequestedPayloadSchema; export const ThreadApprovalResponseRequestedPayload = diff --git a/apps/server/src/orchestration/Services/QueuedTurnDrainReactor.ts b/apps/server/src/orchestration/Services/QueuedTurnDrainReactor.ts new file mode 100644 index 00000000000..0a773225e18 --- /dev/null +++ b/apps/server/src/orchestration/Services/QueuedTurnDrainReactor.ts @@ -0,0 +1,13 @@ +import * as Context from "effect/Context"; +import type * as Effect from "effect/Effect"; +import type * as Scope from "effect/Scope"; + +export interface QueuedTurnDrainReactorShape { + readonly start: () => Effect.Effect; + readonly drain: Effect.Effect; +} + +export class QueuedTurnDrainReactor extends Context.Service< + QueuedTurnDrainReactor, + QueuedTurnDrainReactorShape +>()("t3/orchestration/Services/QueuedTurnDrainReactor") {} diff --git a/apps/server/src/orchestration/commandInvariants.test.ts b/apps/server/src/orchestration/commandInvariants.test.ts index d52f0535fbb..06898b4dfbc 100644 --- a/apps/server/src/orchestration/commandInvariants.test.ts +++ b/apps/server/src/orchestration/commandInvariants.test.ts @@ -70,6 +70,7 @@ const readModel: OrchestrationReadModel = { archivedAt: null, latestTurn: null, messages: [], + queuedTurns: [], session: null, activities: [], proposedPlans: [], @@ -93,6 +94,7 @@ const readModel: OrchestrationReadModel = { archivedAt: null, latestTurn: null, messages: [], + queuedTurns: [], session: null, activities: [], proposedPlans: [], diff --git a/apps/server/src/orchestration/decider.projectScripts.test.ts b/apps/server/src/orchestration/decider.projectScripts.test.ts index c5b7086eb12..006102db5ba 100644 --- a/apps/server/src/orchestration/decider.projectScripts.test.ts +++ b/apps/server/src/orchestration/decider.projectScripts.test.ts @@ -5,6 +5,7 @@ import { MessageId, ProjectId, ThreadId, + TurnQueueItemId, ProviderInstanceId, } from "@t3tools/contracts"; import { createModelSelection } from "@t3tools/shared/model"; @@ -18,6 +19,72 @@ const asEventId = (value: string): EventId => EventId.make(value); const asProjectId = (value: string): ProjectId => ProjectId.make(value); const asMessageId = (value: string): MessageId => MessageId.make(value); +function makeQueuedTurnRequest(messageId: MessageId, text: string) { + return { + message: { + messageId, + role: "user" as const, + text, + attachments: [], + }, + }; +} + +async function seedThreadReadModel(now: string) { + const withProject = await Effect.runPromise( + projectEvent(createEmptyReadModel(now), { + sequence: 1, + eventId: asEventId("evt-project-create"), + aggregateKind: "project", + aggregateId: asProjectId("project-1"), + type: "project.created", + occurredAt: now, + commandId: CommandId.make("cmd-project-create"), + causationEventId: null, + correlationId: CommandId.make("cmd-project-create"), + metadata: {}, + payload: { + projectId: asProjectId("project-1"), + title: "Project", + workspaceRoot: "/tmp/project", + defaultModelSelection: null, + scripts: [], + createdAt: now, + updatedAt: now, + }, + }), + ); + return Effect.runPromise( + projectEvent(withProject, { + sequence: 2, + eventId: asEventId("evt-thread-create"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-1"), + type: "thread.created", + occurredAt: now, + commandId: CommandId.make("cmd-thread-create"), + causationEventId: null, + correlationId: CommandId.make("cmd-thread-create"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-1"), + projectId: asProjectId("project-1"), + title: "Thread", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }), + ); +} + describe("decider project scripts", () => { it("emits empty scripts on project.create", async () => { const now = "2026-01-01T00:00:00.000Z"; @@ -197,6 +264,344 @@ describe("decider project scripts", () => { }); }); + it("queues thread.turn.queue without requesting provider send", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const readModel = await seedThreadReadModel(now); + + const result = await Effect.runPromise( + decideOrchestrationCommand({ + command: { + type: "thread.turn.queue", + commandId: CommandId.make("cmd-turn-queue"), + threadId: ThreadId.make("thread-1"), + message: { + messageId: asMessageId("message-user-queued"), + role: "user", + text: "queued hello", + attachments: [], + }, + createdAt: now, + }, + readModel, + }), + ); + + const events = Array.isArray(result) ? result : [result]; + expect(events.map((event) => event.type)).toEqual([ + "thread.message-sent", + "thread.turn-queued", + ]); + const queuedEvent = events[1]; + expect(queuedEvent?.causationEventId).toBe(events[0]?.eventId ?? null); + if (queuedEvent?.type !== "thread.turn-queued") { + return; + } + expect(queuedEvent.payload).toMatchObject({ + threadId: ThreadId.make("thread-1"), + request: { + message: { + messageId: asMessageId("message-user-queued"), + }, + }, + }); + }); + + it("rejects normal queued send start while another queued turn is sending", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const threadId = ThreadId.make("thread-1"); + const firstQueueItemId = TurnQueueItemId.make("queue-item-1"); + const secondQueueItemId = TurnQueueItemId.make("queue-item-2"); + const baseReadModel = await seedThreadReadModel(now); + const withFirstQueued = await Effect.runPromise( + projectEvent(baseReadModel, { + sequence: 3, + eventId: asEventId("evt-first-turn-queued"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.turn-queued", + occurredAt: now, + commandId: CommandId.make("cmd-first-turn-queued"), + causationEventId: null, + correlationId: CommandId.make("cmd-first-turn-queued"), + metadata: {}, + payload: { + threadId, + queueItemId: firstQueueItemId, + request: makeQueuedTurnRequest(asMessageId("message-first-queued"), "first queued"), + createdAt: now, + }, + }), + ); + const withFirstSending = await Effect.runPromise( + projectEvent(withFirstQueued, { + sequence: 4, + eventId: asEventId("evt-first-queued-started"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.queued-turn-send-started", + occurredAt: now, + commandId: CommandId.make("cmd-first-queued-started"), + causationEventId: null, + correlationId: CommandId.make("cmd-first-queued-started"), + metadata: {}, + payload: { + threadId, + queueItemId: firstQueueItemId, + messageId: asMessageId("message-first-queued"), + createdAt: now, + }, + }), + ); + const withSecondQueued = await Effect.runPromise( + projectEvent(withFirstSending, { + sequence: 5, + eventId: asEventId("evt-second-turn-queued"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.turn-queued", + occurredAt: now, + commandId: CommandId.make("cmd-second-turn-queued"), + causationEventId: null, + correlationId: CommandId.make("cmd-second-turn-queued"), + metadata: {}, + payload: { + threadId, + queueItemId: secondQueueItemId, + request: makeQueuedTurnRequest(asMessageId("message-second-queued"), "second queued"), + createdAt: now, + }, + }), + ); + const readModel = await Effect.runPromise( + projectEvent(withSecondQueued, { + sequence: 6, + eventId: asEventId("evt-session-ready"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.session-set", + occurredAt: now, + commandId: CommandId.make("cmd-session-ready"), + causationEventId: null, + correlationId: CommandId.make("cmd-session-ready"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + }, + }), + ); + + await expect( + Effect.runPromise( + decideOrchestrationCommand({ + command: { + type: "thread.queued-turn.send.start", + commandId: CommandId.make("cmd-normal-send-start"), + threadId, + mode: "normal", + createdAt: now, + }, + readModel, + }), + ), + ).rejects.toThrow("already has a queued turn being sent"); + }); + + it("emits queued send started and shared turn-start-requested when starting a queued turn", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const threadId = ThreadId.make("thread-1"); + const queueItemId = TurnQueueItemId.make("queue-item-1"); + const baseReadModel = await seedThreadReadModel(now); + const withQueuedTurn = await Effect.runPromise( + projectEvent(baseReadModel, { + sequence: 3, + eventId: asEventId("evt-turn-queued"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.turn-queued", + occurredAt: now, + commandId: CommandId.make("cmd-turn-queued"), + causationEventId: null, + correlationId: CommandId.make("cmd-turn-queued"), + metadata: {}, + payload: { + threadId, + queueItemId, + request: makeQueuedTurnRequest(asMessageId("message-user-queued"), "queued hello"), + createdAt: now, + }, + }), + ); + const readModel = await Effect.runPromise( + projectEvent(withQueuedTurn, { + sequence: 4, + eventId: asEventId("evt-session-ready-for-queued-send"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.session-set", + occurredAt: now, + commandId: CommandId.make("cmd-session-ready-for-queued-send"), + causationEventId: null, + correlationId: CommandId.make("cmd-session-ready-for-queued-send"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + }, + }), + ); + + const result = await Effect.runPromise( + decideOrchestrationCommand({ + command: { + type: "thread.queued-turn.send.start", + commandId: CommandId.make("cmd-queued-send-start"), + threadId, + mode: "normal", + createdAt: now, + }, + readModel, + }), + ); + + expect(Array.isArray(result)).toBe(true); + const events = Array.isArray(result) ? result : [result]; + expect(events.map((event) => event.type)).toEqual([ + "thread.queued-turn-send-started", + "thread.turn-start-requested", + ]); + const turnStartRequestedEvent = events[1]; + expect(turnStartRequestedEvent?.causationEventId).toBe(events[0]?.eventId ?? null); + if (turnStartRequestedEvent?.type !== "thread.turn-start-requested") { + return; + } + expect(turnStartRequestedEvent.payload).toMatchObject({ + threadId, + queueItemId, + messageId: asMessageId("message-user-queued"), + queuedRequest: { + message: { + messageId: asMessageId("message-user-queued"), + text: "queued hello", + }, + }, + runtimeMode: "approval-required", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + }); + }); + + it("requeues failed queued turns back to pending", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const threadId = ThreadId.make("thread-1"); + const queueItemId = TurnQueueItemId.make("queue-item-1"); + const baseReadModel = await seedThreadReadModel(now); + const withQueuedTurn = await Effect.runPromise( + projectEvent(baseReadModel, { + sequence: 3, + eventId: asEventId("evt-turn-queued-for-retry"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.turn-queued", + occurredAt: now, + commandId: CommandId.make("cmd-turn-queued-for-retry"), + causationEventId: null, + correlationId: CommandId.make("cmd-turn-queued-for-retry"), + metadata: {}, + payload: { + threadId, + queueItemId, + request: makeQueuedTurnRequest( + asMessageId("message-user-queued-retry"), + "queued retry hello", + ), + createdAt: now, + }, + }), + ); + const withSendingTurn = await Effect.runPromise( + projectEvent(withQueuedTurn, { + sequence: 4, + eventId: asEventId("evt-turn-sending-for-retry"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.queued-turn-send-started", + occurredAt: now, + commandId: CommandId.make("cmd-turn-sending-for-retry"), + causationEventId: null, + correlationId: CommandId.make("cmd-turn-sending-for-retry"), + metadata: {}, + payload: { + threadId, + queueItemId, + messageId: asMessageId("message-user-queued-retry"), + createdAt: now, + }, + }), + ); + const readModel = await Effect.runPromise( + projectEvent(withSendingTurn, { + sequence: 5, + eventId: asEventId("evt-turn-failed-for-retry"), + aggregateKind: "thread", + aggregateId: threadId, + type: "thread.queued-turn-send-failed", + occurredAt: now, + commandId: CommandId.make("cmd-turn-failed-for-retry"), + causationEventId: null, + correlationId: CommandId.make("cmd-turn-failed-for-retry"), + metadata: {}, + payload: { + threadId, + queueItemId, + messageId: asMessageId("message-user-queued-retry"), + reason: "Provider send failed.", + createdAt: now, + }, + }), + ); + + const result = await Effect.runPromise( + decideOrchestrationCommand({ + command: { + type: "thread.queued-turn.retry", + commandId: CommandId.make("cmd-queued-turn-retry"), + threadId, + queueItemId, + createdAt: now, + }, + readModel, + }), + ); + + expect(Array.isArray(result)).toBe(false); + if (Array.isArray(result)) { + return; + } + expect(result).toMatchObject({ + type: "thread.queued-turn-requeued", + payload: { + threadId, + queueItemId, + }, + }); + }); + it("emits thread.runtime-mode-set from thread.runtime-mode.set", async () => { const now = "2026-01-01T00:00:00.000Z"; const initial = createEmptyReadModel(now); diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 1004c945dbf..178c3f7940d 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -3,8 +3,10 @@ import type { OrchestrationEvent, OrchestrationReadModel, } from "@t3tools/contracts"; +import { TurnQueueItemId } from "@t3tools/contracts"; import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; +import * as Random from "effect/Random"; import { OrchestrationCommandInvariantError } from "./Errors.ts"; import { @@ -20,6 +22,10 @@ import { projectEvent } from "./projector.ts"; const nowIso = Effect.map(DateTime.now, DateTime.formatIso); +const newTurnQueueItemId = Effect.map(Random.nextUUIDv4, (id) => + TurnQueueItemId.make(`queue-item:${id}`), +); + function withEventBase( input: Pick & { readonly aggregateKind: OrchestrationEvent["aggregateKind"]; @@ -371,6 +377,26 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); + const userMessageEvent: Omit = { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.message-sent", + payload: { + threadId: command.threadId, + messageId: command.message.messageId, + role: "user", + text: command.message.text, + attachments: command.message.attachments, + turnId: null, + streaming: false, + createdAt: command.createdAt, + updatedAt: command.createdAt, + }, + }; const sourceProposedPlan = command.sourceProposedPlan; const sourceThread = sourceProposedPlan ? yield* requireThread({ @@ -395,6 +421,37 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" detail: `Proposed plan '${sourceProposedPlan?.planId}' belongs to thread '${sourceThread.id}' in a different project.`, }); } + const turnStartRequestedEvent: Omit = { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + causationEventId: userMessageEvent.eventId, + type: "thread.turn-start-requested", + payload: { + threadId: command.threadId, + messageId: command.message.messageId, + ...(command.modelSelection !== undefined + ? { modelSelection: command.modelSelection } + : {}), + ...(command.titleSeed !== undefined ? { titleSeed: command.titleSeed } : {}), + runtimeMode: targetThread.runtimeMode, + interactionMode: targetThread.interactionMode, + ...(sourceProposedPlan !== undefined ? { sourceProposedPlan } : {}), + createdAt: command.createdAt, + }, + }; + return [userMessageEvent, turnStartRequestedEvent]; + } + + case "thread.turn.queue": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); const userMessageEvent: Omit = { ...withEventBase({ aggregateKind: "thread", @@ -415,7 +472,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" updatedAt: command.createdAt, }, }; - const turnStartRequestedEvent: Omit = { + const turnQueuedEvent: Omit = { ...withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, @@ -423,21 +480,208 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" commandId: command.commandId, }), causationEventId: userMessageEvent.eventId, + type: "thread.turn-queued", + payload: { + threadId: command.threadId, + queueItemId: yield* newTurnQueueItemId, + request: { + message: command.message, + }, + createdAt: command.createdAt, + }, + }; + return [userMessageEvent, turnQueuedEvent]; + } + + case "thread.queued-turn.retry": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const queuedTurn = thread.queuedTurns.find( + (entry) => entry.queueItemId === command.queueItemId, + ); + if (!queuedTurn) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued turn '${command.queueItemId}' does not exist on thread '${command.threadId}'.`, + }); + } + if (queuedTurn.status !== "failed") { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued turn '${command.queueItemId}' is not failed and cannot be retried.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-turn-requeued", + payload: { + threadId: command.threadId, + queueItemId: queuedTurn.queueItemId, + messageId: queuedTurn.request.message.messageId, + createdAt: command.createdAt, + }, + }; + } + + case "thread.queued-turn.send.start": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const queuedTurn = thread.queuedTurns.find((entry) => + command.mode === "recover" ? entry.status === "sending" : entry.status === "pending", + ); + if (!queuedTurn) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Thread '${command.threadId}' has no ${command.mode === "recover" ? "recoverable sending" : "pending"} queued turn to send.`, + }); + } + if ( + command.mode === "normal" && + thread.queuedTurns.some((entry) => entry.status === "sending") + ) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Thread '${command.threadId}' already has a queued turn being sent.`, + }); + } + if (thread.session?.status !== "ready") { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Thread '${command.threadId}' is not ready to send a queued turn.`, + }); + } + const turnStartRequestedEvent: Omit = { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), type: "thread.turn-start-requested", payload: { threadId: command.threadId, - messageId: command.message.messageId, - ...(command.modelSelection !== undefined - ? { modelSelection: command.modelSelection } - : {}), - ...(command.titleSeed !== undefined ? { titleSeed: command.titleSeed } : {}), - runtimeMode: targetThread.runtimeMode, - interactionMode: targetThread.interactionMode, - ...(sourceProposedPlan !== undefined ? { sourceProposedPlan } : {}), + messageId: queuedTurn.request.message.messageId, + runtimeMode: thread.runtimeMode, + interactionMode: thread.interactionMode, + queueItemId: queuedTurn.queueItemId, + queuedRequest: queuedTurn.request, + createdAt: command.createdAt, + }, + }; + if (command.mode === "recover") { + return turnStartRequestedEvent; + } + const queuedTurnSendStartedEvent: Omit = { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-turn-send-started", + payload: { + threadId: command.threadId, + queueItemId: queuedTurn.queueItemId, + messageId: queuedTurn.request.message.messageId, + createdAt: command.createdAt, + }, + }; + return [ + queuedTurnSendStartedEvent, + { + ...turnStartRequestedEvent, + causationEventId: queuedTurnSendStartedEvent.eventId, + }, + ]; + } + + case "thread.queued-turn.send.fail": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const queuedTurn = thread.queuedTurns.find( + (entry) => entry.queueItemId === command.queueItemId, + ); + if (!queuedTurn) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued turn '${command.queueItemId}' does not exist on thread '${command.threadId}'.`, + }); + } + if (queuedTurn.status !== "sending") { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued turn '${command.queueItemId}' is not sending and cannot fail.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-turn-send-failed", + payload: { + threadId: command.threadId, + queueItemId: queuedTurn.queueItemId, + messageId: queuedTurn.request.message.messageId, + reason: command.reason, + createdAt: command.createdAt, + }, + }; + } + + case "thread.queued-turn.resolve": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const queuedTurn = thread.queuedTurns.find( + (entry) => entry.queueItemId === command.queueItemId, + ); + if (!queuedTurn) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued turn '${command.queueItemId}' does not exist on thread '${command.threadId}'.`, + }); + } + if (queuedTurn.status !== "sending") { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued turn '${command.queueItemId}' is not sending and cannot resolve.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-turn-resolved", + payload: { + threadId: command.threadId, + queueItemId: queuedTurn.queueItemId, + messageId: queuedTurn.request.message.messageId, + turnId: command.turnId, createdAt: command.createdAt, }, }; - return [userMessageEvent, turnStartRequestedEvent]; } case "thread.turn.interrupt": { diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 01dcb9abeac..c3ab29eda90 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -91,6 +91,7 @@ describe("orchestration projector", () => { archivedAt: null, deletedAt: null, messages: [], + queuedTurns: [], proposedPlans: [], activities: [], checkpoints: [], @@ -99,6 +100,165 @@ describe("orchestration projector", () => { ]); }); + it("projects queued turn lifecycle events", async () => { + const now = "2026-01-01T00:00:00.000Z"; + const created = await Effect.runPromise( + projectEvent( + createEmptyReadModel(now), + makeEvent({ + sequence: 1, + type: "thread.created", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-thread-create", + payload: { + threadId: "thread-1", + projectId: "project-1", + title: "demo", + modelSelection: { + provider: ProviderDriverKind.make("codex"), + model: "gpt-5-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }), + ), + ); + + const queued = await Effect.runPromise( + projectEvent( + created, + makeEvent({ + sequence: 2, + type: "thread.turn-queued", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-turn-queue", + payload: { + threadId: "thread-1", + queueItemId: "queue-item-1", + request: { + message: { + messageId: "message-1", + role: "user", + text: "queued message", + attachments: [], + }, + }, + createdAt: now, + }, + }), + ), + ); + expect(queued.threads[0]?.queuedTurns[0]).toMatchObject({ + queueItemId: "queue-item-1", + request: { + message: { + messageId: "message-1", + }, + }, + status: "pending", + }); + + const sending = await Effect.runPromise( + projectEvent( + queued, + makeEvent({ + sequence: 3, + type: "thread.queued-turn-send-started", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-queued-turn-send-started", + payload: { + threadId: "thread-1", + queueItemId: "queue-item-1", + messageId: "message-1", + createdAt: now, + }, + }), + ), + ); + expect(sending.threads[0]?.queuedTurns[0]?.status).toBe("sending"); + + const failed = await Effect.runPromise( + projectEvent( + sending, + makeEvent({ + sequence: 4, + type: "thread.queued-turn-send-failed", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-queued-turn-send-failed", + payload: { + threadId: "thread-1", + queueItemId: "queue-item-1", + messageId: "message-1", + reason: "Provider rejected queued turn.", + createdAt: now, + }, + }), + ), + ); + expect(failed.threads[0]?.queuedTurns[0]).toMatchObject({ + status: "failed", + failureReason: "Provider rejected queued turn.", + }); + + const requeued = await Effect.runPromise( + projectEvent( + failed, + makeEvent({ + sequence: 5, + type: "thread.queued-turn-requeued", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-queued-turn-requeued", + payload: { + threadId: "thread-1", + queueItemId: "queue-item-1", + messageId: "message-1", + createdAt: now, + }, + }), + ), + ); + expect(requeued.threads[0]?.queuedTurns[0]).toMatchObject({ + status: "pending", + failureReason: null, + }); + + const resolved = await Effect.runPromise( + projectEvent( + requeued, + makeEvent({ + sequence: 6, + type: "thread.queued-turn-resolved", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-queued-turn-resolved", + payload: { + threadId: "thread-1", + queueItemId: "queue-item-1", + messageId: "message-1", + turnId: "turn-1", + createdAt: now, + }, + }), + ), + ); + expect(resolved.threads[0]?.queuedTurns).toEqual([]); + }); + it("fails when event payload cannot be decoded by runtime schema", async () => { const now = "2026-01-01T00:00:00.000Z"; const model = createEmptyReadModel(now); @@ -628,10 +788,31 @@ describe("orchestration projector", () => { }), makeEvent({ sequence: 10, - type: "thread.reverted", + type: "thread.turn-queued", aggregateKind: "thread", aggregateId: "thread-1", occurredAt: "2026-02-23T10:00:05.000Z", + commandId: "cmd-turn-queued-before-revert", + payload: { + threadId: "thread-1", + queueItemId: "queue-item-revert", + request: { + message: { + messageId: "message-revert-queued", + role: "user", + text: "Queued work survives revert", + attachments: [], + }, + }, + createdAt: "2026-02-23T10:00:05.000Z", + }, + }), + makeEvent({ + sequence: 11, + type: "thread.reverted", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: "2026-02-23T10:00:05.500Z", commandId: "cmd-revert", payload: { threadId: "thread-1", @@ -658,6 +839,18 @@ describe("orchestration projector", () => { ).toEqual([{ id: "activity-1", turnId: "turn-1" }]); expect(thread?.checkpoints.map((checkpoint) => checkpoint.checkpointTurnCount)).toEqual([1]); expect(thread?.latestTurn?.turnId).toBe("turn-1"); + expect(thread?.queuedTurns).toMatchObject([ + { + queueItemId: "queue-item-revert", + request: { + message: { + messageId: "message-revert-queued", + text: "Queued work survives revert", + }, + }, + status: "pending", + }, + ]); }); it("does not fallback-retain messages tied to removed turn IDs", async () => { diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 0c92f965433..6ee13942665 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -5,6 +5,11 @@ import { OrchestrationSession, OrchestrationThread, } from "@t3tools/contracts"; +import { + applyQueuedTurnLifecycleEvent, + applyQueuedTurnLifecycleOperation, + getQueuedTurnLifecycleOperation, +} from "@t3tools/shared/orchestrationQueue"; import * as Effect from "effect/Effect"; import * as Schema from "effect/Schema"; @@ -20,8 +25,13 @@ import { ThreadDeletedPayload, ThreadInteractionModeSetPayload, ThreadMetaUpdatedPayload, + ThreadQueuedTurnSendFailedPayload, + ThreadQueuedTurnRequeuedPayload, + ThreadQueuedTurnResolvedPayload, + ThreadQueuedTurnSendStartedPayload, ThreadProposedPlanUpsertedPayload, ThreadRuntimeModeSetPayload, + ThreadTurnQueuedPayload, ThreadUnarchivedPayload, ThreadRevertedPayload, ThreadSessionSetPayload, @@ -265,6 +275,7 @@ export function projectEvent( archivedAt: null, deletedAt: null, messages: [], + queuedTurns: [], activities: [], checkpoints: [], session: null, @@ -418,6 +429,133 @@ export function projectEvent( }; }); + case "thread.turn-queued": + return Effect.gen(function* () { + const payload = yield* decodeForEvent( + ThreadTurnQueuedPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const operation = getQueuedTurnLifecycleOperation({ + type: event.type, + payload, + }); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + queuedTurns: applyQueuedTurnLifecycleOperation(thread.queuedTurns, operation), + updatedAt: event.occurredAt, + }), + }; + }); + + case "thread.queued-turn-send-started": + return decodeForEvent( + ThreadQueuedTurnSendStartedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + queuedTurns: applyQueuedTurnLifecycleEvent(thread.queuedTurns, { + type: event.type, + payload, + }), + updatedAt: event.occurredAt, + }), + }; + }), + ); + + case "thread.queued-turn-resolved": + return decodeForEvent( + ThreadQueuedTurnResolvedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + queuedTurns: applyQueuedTurnLifecycleEvent(thread.queuedTurns, { + type: event.type, + payload, + }), + updatedAt: event.occurredAt, + }), + }; + }), + ); + + case "thread.queued-turn-requeued": + return decodeForEvent( + ThreadQueuedTurnRequeuedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + queuedTurns: applyQueuedTurnLifecycleEvent(thread.queuedTurns, { + type: event.type, + payload, + }), + updatedAt: event.occurredAt, + }), + }; + }), + ); + + case "thread.queued-turn-send-failed": + return decodeForEvent( + ThreadQueuedTurnSendFailedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + queuedTurns: applyQueuedTurnLifecycleEvent(thread.queuedTurns, { + type: event.type, + payload, + }), + updatedAt: event.occurredAt, + }), + }; + }), + ); + case "thread.session-set": return Effect.gen(function* () { const payload = yield* decodeForEvent( diff --git a/apps/server/src/persistence/Layers/ProjectionQueuedTurns.ts b/apps/server/src/persistence/Layers/ProjectionQueuedTurns.ts new file mode 100644 index 00000000000..c9c2e427019 --- /dev/null +++ b/apps/server/src/persistence/Layers/ProjectionQueuedTurns.ts @@ -0,0 +1,206 @@ +import { ThreadId, ThreadQueuedTurnRequest, TurnQueueItemId } from "@t3tools/contracts"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; + +import { toPersistenceSqlError } from "../Errors.ts"; +import { + DeleteProjectionQueuedTurnByQueueItemIdInput, + DeleteProjectionQueuedTurnsInput, + ListProjectionQueuedTurnsInput, + ProjectionQueuedTurn, + ProjectionQueuedTurnRepository, + type ProjectionQueuedTurnRepositoryShape, +} from "../Services/ProjectionQueuedTurns.ts"; + +const ProjectionQueuedTurnRow = Schema.Struct({ + queueItemId: TurnQueueItemId, + threadId: ThreadId, + request: Schema.fromJsonString(ThreadQueuedTurnRequest), + status: ProjectionQueuedTurn.fields.status, + failureReason: ProjectionQueuedTurn.fields.failureReason, + createdAt: ProjectionQueuedTurn.fields.createdAt, + updatedAt: ProjectionQueuedTurn.fields.updatedAt, +}); + +const encodeRequestJson = Schema.encodeUnknownSync(Schema.fromJsonString(ThreadQueuedTurnRequest)); + +const GetProjectionQueuedTurnInput = Schema.Struct({ + queueItemId: TurnQueueItemId, +}); + +function mapProjectionQueuedTurnRow( + row: Schema.Schema.Type, +): ProjectionQueuedTurn { + return { + queueItemId: row.queueItemId, + threadId: row.threadId, + request: row.request, + status: row.status, + failureReason: row.failureReason, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }; +} + +const makeProjectionQueuedTurnRepository = Effect.fn("makeProjectionQueuedTurnRepository")( + function* () { + const sql = yield* SqlClient.SqlClient; + + const upsertProjectionQueuedTurnRow = SqlSchema.void({ + Request: ProjectionQueuedTurn, + execute: (row) => + sql` + INSERT INTO projection_queued_turns ( + queue_item_id, + thread_id, + request_json, + status, + failure_reason, + created_at, + updated_at + ) + VALUES ( + ${row.queueItemId}, + ${row.threadId}, + ${encodeRequestJson(row.request)}, + ${row.status}, + ${row.failureReason}, + ${row.createdAt}, + ${row.updatedAt} + ) + ON CONFLICT (queue_item_id) + DO UPDATE SET + thread_id = excluded.thread_id, + request_json = excluded.request_json, + status = excluded.status, + failure_reason = excluded.failure_reason, + created_at = excluded.created_at, + updated_at = excluded.updated_at + `, + }); + + const getProjectionQueuedTurnRow = SqlSchema.findOneOption({ + Request: GetProjectionQueuedTurnInput, + Result: ProjectionQueuedTurnRow, + execute: ({ queueItemId }) => + sql` + SELECT + queue.queue_item_id AS "queueItemId", + queue.thread_id AS "threadId", + queue.request_json AS "request", + queue.status AS status, + queue.failure_reason AS "failureReason", + queue.created_at AS "createdAt", + queue.updated_at AS "updatedAt" + FROM projection_queued_turns AS queue + WHERE queue.queue_item_id = ${queueItemId} + `, + }); + + const listProjectionQueuedTurnRows = SqlSchema.findAll({ + Request: ListProjectionQueuedTurnsInput, + Result: ProjectionQueuedTurnRow, + execute: (input) => + input.threadId === undefined + ? sql` + SELECT + queue.queue_item_id AS "queueItemId", + queue.thread_id AS "threadId", + queue.request_json AS "request", + queue.status AS status, + queue.failure_reason AS "failureReason", + queue.created_at AS "createdAt", + queue.updated_at AS "updatedAt" + FROM projection_queued_turns AS queue + ORDER BY queue.created_at ASC, queue.queue_item_id ASC + ` + : sql` + SELECT + queue.queue_item_id AS "queueItemId", + queue.thread_id AS "threadId", + queue.request_json AS "request", + queue.status AS status, + queue.failure_reason AS "failureReason", + queue.created_at AS "createdAt", + queue.updated_at AS "updatedAt" + FROM projection_queued_turns AS queue + WHERE queue.thread_id = ${input.threadId} + ORDER BY queue.created_at ASC, queue.queue_item_id ASC + `, + }); + + const deleteProjectionQueuedTurnRows = SqlSchema.void({ + Request: DeleteProjectionQueuedTurnsInput, + execute: ({ threadId }) => + sql` + DELETE FROM projection_queued_turns + WHERE thread_id = ${threadId} + `, + }); + + const deleteProjectionQueuedTurnRowByQueueItemId = SqlSchema.void({ + Request: DeleteProjectionQueuedTurnByQueueItemIdInput, + execute: ({ queueItemId }) => + sql` + DELETE FROM projection_queued_turns + WHERE queue_item_id = ${queueItemId} + `, + }); + + const upsert: ProjectionQueuedTurnRepositoryShape["upsert"] = (row) => + upsertProjectionQueuedTurnRow(row).pipe( + Effect.mapError(toPersistenceSqlError("ProjectionQueuedTurnRepository.upsert:query")), + ); + + const getByQueueItemId: ProjectionQueuedTurnRepositoryShape["getByQueueItemId"] = (input) => + getProjectionQueuedTurnRow(input).pipe( + Effect.map( + Option.map((row: Schema.Schema.Type) => + mapProjectionQueuedTurnRow(row), + ), + ), + Effect.mapError( + toPersistenceSqlError("ProjectionQueuedTurnRepository.getByQueueItemId:query"), + ), + ); + + const list: ProjectionQueuedTurnRepositoryShape["list"] = (input = {}) => + listProjectionQueuedTurnRows(input).pipe( + Effect.map((rows) => rows.map(mapProjectionQueuedTurnRow)), + Effect.mapError(toPersistenceSqlError("ProjectionQueuedTurnRepository.list:query")), + ); + + const deleteByThreadId: ProjectionQueuedTurnRepositoryShape["deleteByThreadId"] = (input) => + deleteProjectionQueuedTurnRows(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionQueuedTurnRepository.deleteByThreadId:query"), + ), + ); + + const deleteByQueueItemId: ProjectionQueuedTurnRepositoryShape["deleteByQueueItemId"] = ( + input, + ) => + deleteProjectionQueuedTurnRowByQueueItemId(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionQueuedTurnRepository.deleteByQueueItemId:query"), + ), + ); + + return { + upsert, + getByQueueItemId, + list, + deleteByThreadId, + deleteByQueueItemId, + } satisfies ProjectionQueuedTurnRepositoryShape; + }, +); + +export const ProjectionQueuedTurnRepositoryLive = Layer.effect( + ProjectionQueuedTurnRepository, + makeProjectionQueuedTurnRepository(), +); diff --git a/apps/server/src/persistence/Layers/ProjectionTurns.ts b/apps/server/src/persistence/Layers/ProjectionTurns.ts index bd57a4eaa30..a1de7c0baac 100644 --- a/apps/server/src/persistence/Layers/ProjectionTurns.ts +++ b/apps/server/src/persistence/Layers/ProjectionTurns.ts @@ -116,6 +116,7 @@ const makeProjectionTurnRepository = Effect.gen(function* () { thread_id, turn_id, pending_message_id, + queue_item_id, source_proposed_plan_thread_id, source_proposed_plan_id, assistant_message_id, @@ -132,6 +133,7 @@ const makeProjectionTurnRepository = Effect.gen(function* () { ${row.threadId}, NULL, ${row.messageId}, + ${row.queueItemId}, ${row.sourceProposedPlanThreadId}, ${row.sourceProposedPlanId}, NULL, @@ -155,6 +157,7 @@ const makeProjectionTurnRepository = Effect.gen(function* () { SELECT thread_id AS "threadId", pending_message_id AS "messageId", + queue_item_id AS "queueItemId", source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", source_proposed_plan_id AS "sourceProposedPlanId", requested_at AS "requestedAt" diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index cc5024d5f51..f521bf923ea 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -43,6 +43,7 @@ import Migration0027 from "./Migrations/027_ProviderSessionRuntimeInstanceId.ts" import Migration0028 from "./Migrations/028_ProjectionThreadSessionInstanceId.ts"; import Migration0029 from "./Migrations/029_ProjectionThreadDetailOrderingIndexes.ts"; import Migration0030 from "./Migrations/030_ProjectionThreadShellArchiveIndexes.ts"; +import Migration0031 from "./Migrations/031_ProjectionQueuedTurns.ts"; /** * Migration loader with all migrations defined inline. @@ -85,6 +86,7 @@ export const migrationEntries = [ [28, "ProjectionThreadSessionInstanceId", Migration0028], [29, "ProjectionThreadDetailOrderingIndexes", Migration0029], [30, "ProjectionThreadShellArchiveIndexes", Migration0030], + [31, "ProjectionQueuedTurns", Migration0031], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/031_ProjectionQueuedTurns.ts b/apps/server/src/persistence/Migrations/031_ProjectionQueuedTurns.ts new file mode 100644 index 00000000000..903ddb6a65f --- /dev/null +++ b/apps/server/src/persistence/Migrations/031_ProjectionQueuedTurns.ts @@ -0,0 +1,28 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + ALTER TABLE projection_turns + ADD COLUMN queue_item_id TEXT + `; + + yield* sql` + CREATE TABLE IF NOT EXISTS projection_queued_turns ( + queue_item_id TEXT PRIMARY KEY, + thread_id TEXT NOT NULL, + request_json TEXT NOT NULL, + status TEXT NOT NULL, + failure_reason TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_queued_turns_thread_status_created + ON projection_queued_turns(thread_id, status, created_at) + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionQueuedTurns.ts b/apps/server/src/persistence/Services/ProjectionQueuedTurns.ts new file mode 100644 index 00000000000..c106e2bb1fc --- /dev/null +++ b/apps/server/src/persistence/Services/ProjectionQueuedTurns.ts @@ -0,0 +1,61 @@ +import { + IsoDateTime, + OrchestrationQueuedTurnStatus, + ThreadId, + ThreadQueuedTurnRequest, + TurnQueueItemId, +} from "@t3tools/contracts"; +import * as Context from "effect/Context"; +import type * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; + +import type { ProjectionRepositoryError } from "../Errors.ts"; + +export const ProjectionQueuedTurn = Schema.Struct({ + queueItemId: TurnQueueItemId, + threadId: ThreadId, + request: ThreadQueuedTurnRequest, + status: OrchestrationQueuedTurnStatus, + failureReason: Schema.NullOr(Schema.String), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, +}); +export type ProjectionQueuedTurn = typeof ProjectionQueuedTurn.Type; + +export const ListProjectionQueuedTurnsInput = Schema.Struct({ + threadId: Schema.optional(ThreadId), +}); +export type ListProjectionQueuedTurnsInput = typeof ListProjectionQueuedTurnsInput.Type; + +export const DeleteProjectionQueuedTurnsInput = Schema.Struct({ + threadId: ThreadId, +}); +export type DeleteProjectionQueuedTurnsInput = typeof DeleteProjectionQueuedTurnsInput.Type; + +export const DeleteProjectionQueuedTurnByQueueItemIdInput = Schema.Struct({ + queueItemId: TurnQueueItemId, +}); +export type DeleteProjectionQueuedTurnByQueueItemIdInput = + typeof DeleteProjectionQueuedTurnByQueueItemIdInput.Type; + +export interface ProjectionQueuedTurnRepositoryShape { + readonly upsert: (row: ProjectionQueuedTurn) => Effect.Effect; + readonly getByQueueItemId: (input: { + readonly queueItemId: TurnQueueItemId; + }) => Effect.Effect, ProjectionRepositoryError>; + readonly list: ( + input?: ListProjectionQueuedTurnsInput, + ) => Effect.Effect, ProjectionRepositoryError>; + readonly deleteByThreadId: ( + input: DeleteProjectionQueuedTurnsInput, + ) => Effect.Effect; + readonly deleteByQueueItemId: ( + input: DeleteProjectionQueuedTurnByQueueItemIdInput, + ) => Effect.Effect; +} + +export class ProjectionQueuedTurnRepository extends Context.Service< + ProjectionQueuedTurnRepository, + ProjectionQueuedTurnRepositoryShape +>()("t3/persistence/Services/ProjectionQueuedTurns/ProjectionQueuedTurnRepository") {} diff --git a/apps/server/src/persistence/Services/ProjectionTurns.ts b/apps/server/src/persistence/Services/ProjectionTurns.ts index f3d5d5e4706..73831d9855a 100644 --- a/apps/server/src/persistence/Services/ProjectionTurns.ts +++ b/apps/server/src/persistence/Services/ProjectionTurns.ts @@ -16,6 +16,7 @@ import { OrchestrationCheckpointStatus, ThreadId, TurnId, + TurnQueueItemId, } from "@t3tools/contracts"; import * as Option from "effect/Option"; import * as Schema from "effect/Schema"; @@ -72,6 +73,7 @@ export type ProjectionTurnById = typeof ProjectionTurnById.Type; export const ProjectionPendingTurnStart = Schema.Struct({ threadId: ThreadId, messageId: MessageId, + queueItemId: Schema.NullOr(TurnQueueItemId), sourceProposedPlanThreadId: Schema.NullOr(ThreadId), sourceProposedPlanId: Schema.NullOr(OrchestrationProposedPlanId), requestedAt: IsoDateTime, diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 3ae2885f024..8d598f581a6 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -170,6 +170,7 @@ const makeDefaultOrchestrationReadModel = () => { archivedAt: null, latestTurn: null, messages: [], + queuedTurns: [], session: null, activities: [], proposedPlans: [], @@ -3233,6 +3234,7 @@ it.layer(NodeServices.layer)("server router seam", (it) => { archivedAt: null, latestTurn: null, messages: [], + queuedTurns: [], session: null, activities: [], proposedPlans: [], @@ -3968,7 +3970,7 @@ it.layer(NodeServices.layer)("server router seam", (it) => { const finalCommand = dispatchedCommands[4]; assertTrue(finalCommand?.type === "thread.turn.start"); if (finalCommand?.type === "thread.turn.start") { - assert.equal(finalCommand.bootstrap, undefined); + assert.ok(!("bootstrap" in finalCommand) || finalCommand.bootstrap === undefined); } }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index c6780559204..462f596b769 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -40,6 +40,7 @@ import { OrchestrationReactorLive } from "./orchestration/Layers/OrchestrationRe import { RuntimeReceiptBusLive } from "./orchestration/Layers/RuntimeReceiptBus.ts"; import { ProviderRuntimeIngestionLive } from "./orchestration/Layers/ProviderRuntimeIngestion.ts"; import { ProviderCommandReactorLive } from "./orchestration/Layers/ProviderCommandReactor.ts"; +import { QueuedTurnDrainReactorLive } from "./orchestration/Layers/QueuedTurnDrainReactor.ts"; import { CheckpointReactorLive } from "./orchestration/Layers/CheckpointReactor.ts"; import { ThreadDeletionReactorLive } from "./orchestration/Layers/ThreadDeletionReactor.ts"; import { ProviderRegistryLive } from "./provider/Layers/ProviderRegistry.ts"; @@ -143,6 +144,7 @@ const ReactorLayerLive = Layer.empty.pipe( Layer.provideMerge(OrchestrationReactorLive), Layer.provideMerge(ProviderRuntimeIngestionLive), Layer.provideMerge(ProviderCommandReactorLive), + Layer.provideMerge(QueuedTurnDrainReactorLive), Layer.provideMerge(CheckpointReactorLive), Layer.provideMerge(ThreadDeletionReactorLive), Layer.provideMerge(RuntimeReceiptBusLive), diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index e99672161c9..31a2a088cd9 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -358,7 +358,12 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => }; const dispatchBootstrapTurnStart = ( - command: Extract, + command: Extract & + Readonly<{ + bootstrap: NonNullable< + Extract["bootstrap"] + >; + }>, ): Effect.Effect<{ readonly sequence: number }, OrchestrationDispatchCommandError> => Effect.gen(function* () { const bootstrap = command.bootstrap; @@ -552,7 +557,10 @@ const makeWsRpcLayer = (currentSessionId: AuthSessionId) => ): Effect.Effect<{ readonly sequence: number }, OrchestrationDispatchCommandError> => { const dispatchEffect = normalizedCommand.type === "thread.turn.start" && normalizedCommand.bootstrap - ? dispatchBootstrapTurnStart(normalizedCommand) + ? dispatchBootstrapTurnStart({ + ...normalizedCommand, + bootstrap: normalizedCommand.bootstrap, + }) : orchestrationEngine .dispatch(normalizedCommand) .pipe( diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index bb62d8edbc0..d439002e036 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -363,6 +363,7 @@ function createSnapshotForTargetUser(options: { archivedAt: null, deletedAt: null, messages, + queuedTurns: [], activities: [], proposedPlans: [], checkpoints: [], @@ -428,6 +429,7 @@ function addThreadToSnapshot( archivedAt: null, deletedAt: null, messages: [], + queuedTurns: [], activities: [], proposedPlans: [], checkpoints: [], @@ -762,6 +764,7 @@ function createSnapshotWithSecondaryProject(options?: { updatedAt: isoAt(31), deletedAt: null, messages: [], + queuedTurns: [], activities: [], proposedPlans: [], checkpoints: [], @@ -794,6 +797,7 @@ function createSnapshotWithSecondaryProject(options?: { updatedAt: isoAt(25), deletedAt: null, messages: [], + queuedTurns: [], activities: [], proposedPlans: [], checkpoints: [], diff --git a/apps/web/src/components/ChatView.logic.test.ts b/apps/web/src/components/ChatView.logic.test.ts index 83c90edaddc..000d8260ad8 100644 --- a/apps/web/src/components/ChatView.logic.test.ts +++ b/apps/web/src/components/ChatView.logic.test.ts @@ -1,11 +1,13 @@ import { scopeThreadRef } from "@t3tools/client-runtime"; import { EnvironmentId, + MessageId, ProjectId, ProviderDriverKind, ProviderInstanceId, ThreadId, TurnId, + TurnQueueItemId, } from "@t3tools/contracts"; import { afterEach, describe, expect, it, vi } from "vitest"; import { type EnvironmentState, useStore } from "../store"; @@ -214,6 +216,9 @@ describe("shouldWriteThreadErrorToCurrentServerThread", () => { const makeThread = (input?: { id?: ThreadId; + session?: Thread["session"]; + messages?: Thread["messages"]; + queuedTurns?: Thread["queuedTurns"]; latestTurn?: { turnId: TurnId; state: "running" | "completed"; @@ -230,8 +235,9 @@ const makeThread = (input?: { modelSelection: { instanceId: ProviderInstanceId.make("codex"), model: "gpt-5.4" }, runtimeMode: "full-access" as const, interactionMode: "default" as const, - session: null, - messages: [], + session: input?.session ?? null, + messages: input?.messages ?? [], + queuedTurns: input?.queuedTurns ?? [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -314,6 +320,20 @@ function setStoreThreads(threads: ReadonlyArray>) Object.fromEntries(thread.messages.map((message) => [message.id, message])), ]), ), + queuedTurnIdsByThreadId: Object.fromEntries( + threads.map((thread) => [ + thread.id, + thread.queuedTurns.map((queuedTurn) => queuedTurn.queueItemId), + ]), + ), + queuedTurnByThreadId: Object.fromEntries( + threads.map((thread) => [ + thread.id, + Object.fromEntries( + thread.queuedTurns.map((queuedTurn) => [queuedTurn.queueItemId, queuedTurn]), + ), + ]), + ), activityIdsByThreadId: Object.fromEntries( threads.map((thread) => [thread.id, thread.activities.map((activity) => activity.id)]), ), @@ -478,6 +498,7 @@ describe("hasServerAcknowledgedLocalDispatch", () => { interactionMode: "default", session: previousSession, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -496,6 +517,8 @@ describe("hasServerAcknowledgedLocalDispatch", () => { phase: "ready", latestTurn: previousLatestTurn, session: previousSession, + messages: [], + queuedTurns: [], hasPendingApproval: false, hasPendingUserInput: false, threadError: null, @@ -515,6 +538,7 @@ describe("hasServerAcknowledgedLocalDispatch", () => { interactionMode: "default", session: previousSession, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -542,6 +566,8 @@ describe("hasServerAcknowledgedLocalDispatch", () => { ...previousSession, updatedAt: "2026-03-29T00:01:30.000Z", }, + messages: [], + queuedTurns: [], hasPendingApproval: false, hasPendingUserInput: false, threadError: null, @@ -561,6 +587,7 @@ describe("hasServerAcknowledgedLocalDispatch", () => { interactionMode: "default", session: previousSession, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -585,6 +612,8 @@ describe("hasServerAcknowledgedLocalDispatch", () => { activeTurnId: TurnId.make("turn-2"), updatedAt: "2026-03-29T00:01:00.000Z", }, + messages: [], + queuedTurns: [], hasPendingApproval: false, hasPendingUserInput: false, threadError: null, @@ -604,6 +633,7 @@ describe("hasServerAcknowledgedLocalDispatch", () => { interactionMode: "default", session: previousSession, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -628,6 +658,8 @@ describe("hasServerAcknowledgedLocalDispatch", () => { activeTurnId: undefined, updatedAt: "2026-03-29T00:01:00.000Z", }, + messages: [], + queuedTurns: [], hasPendingApproval: false, hasPendingUserInput: false, threadError: null, @@ -647,6 +679,7 @@ describe("hasServerAcknowledgedLocalDispatch", () => { interactionMode: "default", session: previousSession, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -678,6 +711,8 @@ describe("hasServerAcknowledgedLocalDispatch", () => { activeTurnId: TurnId.make("turn-2"), updatedAt: "2026-03-29T00:01:01.000Z", }, + messages: [], + queuedTurns: [], hasPendingApproval: false, hasPendingUserInput: false, threadError: null, @@ -697,6 +732,7 @@ describe("hasServerAcknowledgedLocalDispatch", () => { interactionMode: "default", session: previousSession, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-29T00:00:00.000Z", @@ -718,6 +754,184 @@ describe("hasServerAcknowledgedLocalDispatch", () => { ...previousSession, updatedAt: "2026-03-29T00:00:11.000Z", }, + messages: [], + queuedTurns: [], + hasPendingApproval: false, + hasPendingUserInput: false, + threadError: null, + }), + ).toBe(true); + }); + + it("does not clear queued local dispatch before the queued message is reflected", () => { + const messageId = MessageId.make("message-queued-pending"); + const localDispatch = createLocalDispatchSnapshot( + makeThread({ + session: previousSession, + latestTurn: previousLatestTurn, + }), + { + delivery: "queue", + messageId, + }, + ); + + expect( + hasServerAcknowledgedLocalDispatch({ + localDispatch, + phase: "running", + latestTurn: previousLatestTurn, + session: { + ...previousSession, + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-2"), + updatedAt: "2026-03-29T00:01:00.000Z", + }, + messages: [], + queuedTurns: [], + hasPendingApproval: false, + hasPendingUserInput: false, + threadError: null, + }), + ).toBe(false); + }); + + it("clears queued local dispatch when the queued turn is reflected on the thread", () => { + const messageId = MessageId.make("message-queued-reflected"); + const localDispatch = createLocalDispatchSnapshot( + makeThread({ + session: previousSession, + latestTurn: previousLatestTurn, + }), + { + delivery: "queue", + messageId, + }, + ); + + expect( + hasServerAcknowledgedLocalDispatch({ + localDispatch, + phase: "running", + latestTurn: previousLatestTurn, + session: { + ...previousSession, + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-2"), + updatedAt: "2026-03-29T00:01:00.000Z", + }, + messages: [], + queuedTurns: [ + { + queueItemId: TurnQueueItemId.make("queue-item-1"), + request: { + message: { + messageId, + role: "user", + text: "queued follow-up", + attachments: [], + }, + }, + status: "pending", + failureReason: null, + createdAt: "2026-03-29T00:01:00.000Z", + updatedAt: "2026-03-29T00:01:00.000Z", + }, + ], + hasPendingApproval: false, + hasPendingUserInput: false, + threadError: null, + }), + ).toBe(true); + }); + + it("clears queued local dispatch after the queued message is resolved into server messages", () => { + const messageId = MessageId.make("message-queued-resolved"); + const localDispatch = createLocalDispatchSnapshot( + makeThread({ + session: previousSession, + latestTurn: previousLatestTurn, + }), + { + delivery: "queue", + messageId, + }, + ); + + expect( + hasServerAcknowledgedLocalDispatch({ + localDispatch, + phase: "running", + latestTurn: previousLatestTurn, + session: { + ...previousSession, + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-2"), + updatedAt: "2026-03-29T00:01:00.000Z", + }, + messages: [ + { + id: messageId, + role: "user", + text: "queued follow-up", + createdAt: "2026-03-29T00:01:00.000Z", + streaming: false, + }, + ], + queuedTurns: [], + hasPendingApproval: false, + hasPendingUserInput: false, + threadError: null, + }), + ).toBe(true); + }); + + it("clears queued local dispatch when the queued turn fails after admission", () => { + const messageId = MessageId.make("message-queued-failed"); + const localDispatch = createLocalDispatchSnapshot( + makeThread({ + session: previousSession, + latestTurn: previousLatestTurn, + }), + { + delivery: "queue", + messageId, + }, + ); + + expect( + hasServerAcknowledgedLocalDispatch({ + localDispatch, + phase: "running", + latestTurn: previousLatestTurn, + session: { + ...previousSession, + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.make("turn-2"), + updatedAt: "2026-03-29T00:01:00.000Z", + }, + messages: [], + queuedTurns: [ + { + queueItemId: TurnQueueItemId.make("queue-item-2"), + request: { + message: { + messageId, + role: "user", + text: "queued failure", + attachments: [], + }, + }, + status: "failed", + failureReason: "Provider send failed.", + createdAt: "2026-03-29T00:01:00.000Z", + updatedAt: "2026-03-29T00:01:05.000Z", + }, + ], hasPendingApproval: false, hasPendingUserInput: false, threadError: null, diff --git a/apps/web/src/components/ChatView.logic.ts b/apps/web/src/components/ChatView.logic.ts index bf87add28d9..e569be132e4 100644 --- a/apps/web/src/components/ChatView.logic.ts +++ b/apps/web/src/components/ChatView.logic.ts @@ -1,5 +1,6 @@ import { type EnvironmentId, + type MessageId, isProviderDriverKind, ProjectId, type ModelSelection, @@ -41,6 +42,7 @@ export function buildLocalDraftThread( interactionMode: draftThread.interactionMode, session: null, messages: [], + queuedTurns: [], error, createdAt: draftThread.createdAt, archivedAt: null, @@ -309,6 +311,8 @@ export async function waitForStartedServerThread( export interface LocalDispatchSnapshot { startedAt: string; preparingWorktree: boolean; + delivery: "queue" | "steer"; + messageId: MessageId | null; latestTurnTurnId: TurnId | null; latestTurnRequestedAt: string | null; latestTurnStartedAt: string | null; @@ -319,13 +323,19 @@ export interface LocalDispatchSnapshot { export function createLocalDispatchSnapshot( activeThread: Thread | undefined, - options?: { preparingWorktree?: boolean }, + options?: { + preparingWorktree?: boolean; + delivery?: "queue" | "steer"; + messageId?: MessageId; + }, ): LocalDispatchSnapshot { const latestTurn = activeThread?.latestTurn ?? null; const session = activeThread?.session ?? null; return { startedAt: new Date().toISOString(), preparingWorktree: Boolean(options?.preparingWorktree), + delivery: options?.delivery ?? "steer", + messageId: options?.messageId ?? null, latestTurnTurnId: latestTurn?.turnId ?? null, latestTurnRequestedAt: latestTurn?.requestedAt ?? null, latestTurnStartedAt: latestTurn?.startedAt ?? null, @@ -340,6 +350,8 @@ export function hasServerAcknowledgedLocalDispatch(input: { phase: SessionPhase; latestTurn: Thread["latestTurn"] | null; session: Thread["session"] | null; + messages: Thread["messages"]; + queuedTurns: Thread["queuedTurns"]; hasPendingApproval: boolean; hasPendingUserInput: boolean; threadError: string | null | undefined; @@ -353,6 +365,17 @@ export function hasServerAcknowledgedLocalDispatch(input: { const latestTurn = input.latestTurn ?? null; const session = input.session ?? null; + + if (input.localDispatch.delivery === "queue" && input.localDispatch.messageId !== null) { + const messageId = input.localDispatch.messageId; + // Queue-mode dispatch is acknowledged once the server durably reflects the + // message into thread state, not when a later turn eventually starts. + return ( + input.messages.some((message) => message.id === messageId && message.role === "user") || + input.queuedTurns.some((queuedTurn) => queuedTurn.request.message.messageId === messageId) + ); + } + const latestTurnChanged = input.localDispatch.latestTurnTurnId !== (latestTurn?.turnId ?? null) || input.localDispatch.latestTurnRequestedAt !== (latestTurn?.requestedAt ?? null) || diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index d66d2487ce3..c83d7528cff 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -13,6 +13,7 @@ import { type ResolvedKeybindingsConfig, type ScopedThreadRef, type ThreadId, + type TurnQueueItemId, type TurnId, type KeybindingCommand, OrchestrationThreadActivity, @@ -104,7 +105,7 @@ import { BranchToolbar } from "./BranchToolbar"; import { resolveShortcutCommand, shortcutLabelForCommand } from "../keybindings"; import PlanSidebar from "./PlanSidebar"; import ThreadTerminalDrawer from "./ThreadTerminalDrawer"; -import { ChevronDownIcon, TriangleAlertIcon, WifiOffIcon } from "lucide-react"; +import { ChevronDownIcon, Clock3Icon, TriangleAlertIcon, WifiOffIcon } from "lucide-react"; import { cn, randomUUID } from "~/lib/utils"; import { stackedThreadToast, toastManager } from "./ui/toast"; import { decodeProjectScriptKeybindingRule } from "~/lib/projectScriptKeybindings"; @@ -372,13 +373,29 @@ function useLocalDispatchState(input: { const [localDispatch, setLocalDispatch] = useState(null); const beginLocalDispatch = useCallback( - (options?: { preparingWorktree?: boolean }) => { + (options?: { + preparingWorktree?: boolean; + delivery?: "queue" | "steer"; + messageId?: MessageId; + }) => { const preparingWorktree = Boolean(options?.preparingWorktree); setLocalDispatch((current) => { if (current) { - return current.preparingWorktree === preparingWorktree - ? current - : { ...current, preparingWorktree }; + const nextDelivery = options?.delivery ?? current.delivery; + const nextMessageId = options?.messageId ?? current.messageId; + if ( + current.preparingWorktree === preparingWorktree && + current.delivery === nextDelivery && + current.messageId === nextMessageId + ) { + return current; + } + return { + ...current, + preparingWorktree, + delivery: nextDelivery, + messageId: nextMessageId, + }; } return createLocalDispatchSnapshot(input.activeThread, options); }); @@ -397,6 +414,8 @@ function useLocalDispatchState(input: { phase: input.phase, latestTurn: input.activeLatestTurn, session: input.activeThread?.session ?? null, + messages: input.activeThread?.messages ?? [], + queuedTurns: input.activeThread?.queuedTurns ?? [], hasPendingApproval: input.activePendingApproval !== null, hasPendingUserInput: input.activePendingUserInput !== null, threadError: input.threadError, @@ -405,6 +424,8 @@ function useLocalDispatchState(input: { input.activeLatestTurn, input.activePendingApproval, input.activePendingUserInput, + input.activeThread?.messages, + input.activeThread?.queuedTurns, input.activeThread?.session, input.phase, input.threadError, @@ -1178,7 +1199,31 @@ export default function ChatView(props: ChatViewProps) { savedEnvironmentRuntimeById, serverConfig?.environment.label, ]); - const composerBannerItems = useMemo(() => { + async function handleRetryQueuedTurn(queueItemId: TurnQueueItemId) { + if (!activeThread || isConnecting || activeEnvironmentUnavailable) { + return; + } + const api = readEnvironmentApi(environmentId); + if (!api) { + return; + } + setThreadError(activeThread.id, null); + try { + await api.orchestration.dispatchCommand({ + type: "thread.queued-turn.retry", + commandId: newCommandId(), + threadId: activeThread.id, + queueItemId, + createdAt: new Date().toISOString(), + }); + } catch (err) { + setThreadError( + activeThread.id, + err instanceof Error ? err.message : "Failed to retry queued turn.", + ); + } + } + const composerBannerItems: ComposerBannerStackItem[] = (() => { const items: ComposerBannerStackItem[] = []; if (activeEnvironmentUnavailableState) { items.push({ @@ -1245,17 +1290,61 @@ export default function ChatView(props: ChatViewProps) { }, }); } + const queuedTurns = activeThread?.queuedTurns ?? []; + const failedQueuedTurns = queuedTurns.filter((queuedTurn) => queuedTurn.status === "failed"); + const pendingQueuedTurnCount = queuedTurns.filter( + (queuedTurn) => queuedTurn.status === "pending", + ).length; + const sendingQueuedTurnCount = queuedTurns.filter( + (queuedTurn) => queuedTurn.status === "sending", + ).length; + for (const failedQueuedTurn of failedQueuedTurns) { + const queuedMessageText = failedQueuedTurn.request.message.text.trim(); + const queuedMessagePreview = + queuedMessageText && queuedMessageText.length > 0 ? truncate(queuedMessageText, 140) : null; + items.push({ + id: `queued-turn-failed:${failedQueuedTurn.queueItemId}`, + variant: "error", + icon: , + title: "Queued turn failed", + description: ( + <> + {queuedMessagePreview ? ( + <> + “{queuedMessagePreview}”{" "} + + ) : null} + {failedQueuedTurn.failureReason ?? + "The queued turn could not be sent. Retry it after the current session settles."} + + ), + actions: ( + + ), + }); + } + if (pendingQueuedTurnCount > 0 || sendingQueuedTurnCount > 0) { + const queuedTurnCount = pendingQueuedTurnCount + sendingQueuedTurnCount; + items.push({ + id: `queued-turns:${activeThread?.id ?? "none"}`, + variant: "info", + icon: , + title: queuedTurnCount === 1 ? "1 turn is queued" : `${queuedTurnCount} turns are queued`, + description: + sendingQueuedTurnCount > 0 && pendingQueuedTurnCount > 0 + ? `${sendingQueuedTurnCount} sending, ${pendingQueuedTurnCount} waiting for the thread to become ready.` + : sendingQueuedTurnCount > 0 + ? "The next queued turn is being sent now." + : "Queued turns will send automatically when the thread becomes ready.", + }); + } return items; - }, [ - activeEnvironmentUnavailableState, - handleReconnectActiveEnvironment, - navigate, - reconnectingEnvironmentId, - showVersionMismatchBanner, - versionMismatch, - versionMismatchDismissKey, - versionMismatchServerLabel, - ]); + })(); const providerStatuses = serverConfig?.providers ?? EMPTY_PROVIDERS; const unlockedSelectedProvider = resolveSelectableProvider( providerStatuses, @@ -2635,6 +2724,7 @@ export default function ChatView(props: ChatViewProps) { selectedPromptEffort: ctxSelectedPromptEffort, selectedModelSelection: ctxSelectedModelSelection, } = sendCtx; + const delivery = phase === "running" ? "queue" : "steer"; const promptForSend = promptRef.current; const { trimmedPrompt: trimmed, @@ -2704,9 +2794,6 @@ export default function ChatView(props: ChatViewProps) { return; } - sendInFlightRef.current = true; - beginLocalDispatch({ preparingWorktree: Boolean(baseBranchForWorktree) }); - const composerImagesSnapshot = [...composerImages]; const composerTerminalContextsSnapshot = [...sendableComposerTerminalContexts]; const messageTextForSend = appendTerminalContextsToPrompt( @@ -2715,6 +2802,12 @@ export default function ChatView(props: ChatViewProps) { ); const messageIdForSend = newMessageId(); const messageCreatedAt = new Date().toISOString(); + sendInFlightRef.current = true; + beginLocalDispatch({ + preparingWorktree: Boolean(baseBranchForWorktree), + delivery, + messageId: messageIdForSend, + }); const outgoingMessageText = formatOutgoingPrompt({ provider: ctxSelectedProvider, model: ctxSelectedModel, @@ -2854,23 +2947,38 @@ export default function ChatView(props: ChatViewProps) { } : undefined; beginLocalDispatch({ preparingWorktree: false }); - await api.orchestration.dispatchCommand({ - type: "thread.turn.start", - commandId: newCommandId(), - threadId: threadIdForSend, - message: { - messageId: messageIdForSend, - role: "user", - text: outgoingMessageText, - attachments: turnAttachments, - }, - modelSelection: ctxSelectedModelSelection, - titleSeed: title, - runtimeMode, - interactionMode, - ...(bootstrap ? { bootstrap } : {}), - createdAt: messageCreatedAt, - }); + const turnStartCommand = + delivery === "queue" + ? { + type: "thread.turn.queue" as const, + commandId: newCommandId(), + threadId: threadIdForSend, + message: { + messageId: messageIdForSend, + role: "user" as const, + text: outgoingMessageText, + attachments: turnAttachments, + }, + createdAt: messageCreatedAt, + } + : { + type: "thread.turn.start" as const, + commandId: newCommandId(), + threadId: threadIdForSend, + message: { + messageId: messageIdForSend, + role: "user" as const, + text: outgoingMessageText, + attachments: turnAttachments, + }, + modelSelection: ctxSelectedModelSelection, + titleSeed: title, + runtimeMode, + interactionMode, + ...(bootstrap ? { bootstrap } : {}), + createdAt: messageCreatedAt, + }; + await api.orchestration.dispatchCommand(turnStartCommand); turnStartSucceeded = true; })().catch(async (err: unknown) => { if ( @@ -3133,7 +3241,10 @@ export default function ChatView(props: ChatViewProps) { }); sendInFlightRef.current = true; - beginLocalDispatch({ preparingWorktree: false }); + beginLocalDispatch({ + preparingWorktree: false, + messageId: messageIdForSend, + }); setThreadError(threadIdForSend, null); // Scroll to the current end *before* adding the optimistic message. diff --git a/apps/web/src/components/CommandPalette.logic.test.ts b/apps/web/src/components/CommandPalette.logic.test.ts index 38b44f3f6a7..317f1cac2e8 100644 --- a/apps/web/src/components/CommandPalette.logic.test.ts +++ b/apps/web/src/components/CommandPalette.logic.test.ts @@ -22,6 +22,7 @@ function makeThread(overrides: Partial = {}): Thread { interactionMode: "default", session: null, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-01T00:00:00.000Z", diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index 611eaf572d0..6389e426436 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -182,6 +182,7 @@ function createMinimalSnapshot(): OrchestrationReadModel { updatedAt: NOW_ISO, }, ], + queuedTurns: [], activities: [], proposedPlans: [], checkpoints: [], diff --git a/apps/web/src/components/Sidebar.logic.test.ts b/apps/web/src/components/Sidebar.logic.test.ts index 926c117c1c0..73204230868 100644 --- a/apps/web/src/components/Sidebar.logic.test.ts +++ b/apps/web/src/components/Sidebar.logic.test.ts @@ -732,6 +732,7 @@ function makeThread(overrides: Partial = {}): Thread { interactionMode: DEFAULT_INTERACTION_MODE, session: null, messages: [], + queuedTurns: [], proposedPlans: [], error: null, createdAt: "2026-03-09T10:00:00.000Z", diff --git a/apps/web/src/components/chat/ChatComposer.tsx b/apps/web/src/components/chat/ChatComposer.tsx index 96fc34c1013..aa03602e5b9 100644 --- a/apps/web/src/components/chat/ChatComposer.tsx +++ b/apps/web/src/components/chat/ChatComposer.tsx @@ -965,7 +965,7 @@ export const ChatComposer = memo(function ChatComposer(props: ChatComposerProps) return `pending:${activePendingProgress.questionIndex}:${activePendingProgress.isLastQuestion}:${activePendingIsResponding}`; } if (phase === "running") { - return "running"; + return composerSendState.hasSendableContent ? "running:queue" : "running:interrupt"; } if (showPlanFollowUpPrompt) { return prompt.trim().length > 0 ? "plan:refine" : "plan:implement"; @@ -1050,8 +1050,9 @@ export const ChatComposer = memo(function ChatComposer(props: ChatComposerProps) [activePendingIsResponding, activePendingProgress, activePendingResolvedAnswers], ); const collapsedComposerPrimaryActionDisabled = - phase === "running" || isSendBusy || isConnecting || !composerSendState.hasSendableContent; - const collapsedComposerPrimaryActionLabel = "Send message"; + isSendBusy || isConnecting || !composerSendState.hasSendableContent; + const collapsedComposerPrimaryActionLabel = + phase === "running" ? "Queue message" : "Send message"; const showMobilePendingAnswerActions = isMobileViewport && !isComposerCollapsedMobile && pendingPrimaryAction !== null; @@ -1596,7 +1597,7 @@ export const ChatComposer = memo(function ChatComposer(props: ChatComposerProps) const shouldBlurMobileComposerOnSubmit = useCallback(() => { if (!isMobileViewport) return false; - if (isSendBusy || isConnecting || phase === "running") return false; + if (isSendBusy || isConnecting) return false; if (activePendingProgress) { return activePendingProgress.isLastQuestion && Boolean(activePendingResolvedAnswers); } @@ -1608,7 +1609,6 @@ export const ChatComposer = memo(function ChatComposer(props: ChatComposerProps) isConnecting, isMobileViewport, isSendBusy, - phase, showPlanFollowUpPrompt, ]); diff --git a/apps/web/src/components/chat/ComposerPrimaryActions.tsx b/apps/web/src/components/chat/ComposerPrimaryActions.tsx index fbeb9de30b8..276d2044ebb 100644 --- a/apps/web/src/components/chat/ComposerPrimaryActions.tsx +++ b/apps/web/src/components/chat/ComposerPrimaryActions.tsx @@ -123,6 +123,33 @@ export const ComposerPrimaryActions = memo(function ComposerPrimaryActions({ } if (isRunning) { + if (hasSendableContent) { + return ( +
+ + +
+ ); + } + return (