Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CHAT_CONTENT_MAX_WIDTH } from "@features/sessions/constants";
import { useContextUsage } from "@features/sessions/hooks/useContextUsage";
import { useConversationItems } from "@features/sessions/hooks/useConversationItems";
import { useConversationSearch } from "@features/sessions/hooks/useConversationSearch";
import { SessionTaskIdProvider } from "@features/sessions/hooks/useSessionTaskId";
import {
Expand All @@ -24,11 +25,7 @@ import { Box, Flex, Text } from "@radix-ui/themes";
import type { Task } from "@shared/types";
import type { AcpMessage } from "@shared/types/session-events";
import { memo, useCallback, useEffect, useMemo, useRef, useState } from "react";
import {
buildConversationItems,
type ConversationItem,
type TurnContext,
} from "./buildConversationItems";
import type { ConversationItem, TurnContext } from "./buildConversationItems";
import { ConversationSearchBar } from "./ConversationSearchBar";
import { GitActionMessage } from "./GitActionMessage";
import { GitActionResult } from "./GitActionResult";
Expand Down Expand Up @@ -82,19 +79,21 @@ export function ConversationView({
const [showScrollButton, setShowScrollButton] = useState(false);
const debugLogsCloudRuns = useSettingsStore((s) => s.debugLogsCloudRuns);
const showDebugLogs = debugLogsCloudRuns;

const contextUsage = useContextUsage(events);

// Streaming appends one event per token. The parse is incremental — each
// event is handled once and completed turns are reused by reference — so per
// token the work tracks the active turn, not the whole thread. We feed
// `events` directly (no frame-throttle) so a sent message's optimistic->real
// swap is never delayed past the frame the store commits it.
const {
items: conversationItems,
lastTurnInfo,
isCompacting,
} = useMemo(
() =>
buildConversationItems(events, isPromptPending, {
showDebugLogs,
}),
[events, isPromptPending, showDebugLogs],
);
} = useConversationItems(events, isPromptPending, {
showDebugLogs,
});

const firstUserMessageIdRef = useRef<string | undefined>(undefined);
if (firstUserMessageIdRef.current === undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ interface ProgressCardState {
steps: Step[];
isActive: boolean;
};
/** Index in `items` where this card sits. */
itemIndex: number;
}

interface TurnState {
Expand All @@ -94,9 +96,13 @@ interface TurnState {
itemCount: number;
}

interface ItemBuilder {
export interface ItemBuilder {
items: ConversationItem[];
currentTurn: TurnState | null;
/** Index in `items` where the current turn's first item sits. Lets an
* incremental consumer treat everything before it (completed turns) as
* frozen and only re-derive the active turn. */
currentTurnStartIndex: number;
pendingPrompts: Map<number, TurnState>;
shellExecutes: Map<string, { item: UserShellExecute; index: number }>;
isCompacting: boolean;
Expand All @@ -106,18 +112,25 @@ interface ItemBuilder {
* event for the same id mutates the same card, regardless of which turn is
* currently active. */
progressCards: Map<string, ProgressCardState>;
/** Lowest item index touched by a progress event since it was last reset.
* An incremental consumer resets this before feeding a batch of events and
* reads it after to detect a card being mutated inside an already frozen
* (completed) turn, which would otherwise go unseen. */
lowestTouchedProgressIndex: number;
}

function createItemBuilder(): ItemBuilder {
export function createItemBuilder(): ItemBuilder {
let idCounter = 0;
return {
items: [],
currentTurn: null,
currentTurnStartIndex: 0,
pendingPrompts: new Map(),
shellExecutes: new Map(),
isCompacting: false,
nextId: () => idCounter++,
progressCards: new Map(),
lowestTouchedProgressIndex: Number.POSITIVE_INFINITY,
};
}

Expand All @@ -130,7 +143,7 @@ function isThoughtItem(
);
}

function markThoughtCompletion(items: ConversationItem[]) {
export function markThoughtCompletion(items: ConversationItem[]) {
const seenContexts = new Set<TurnContext>();

for (let i = items.length - 1; i >= 0; i--) {
Expand Down Expand Up @@ -172,23 +185,53 @@ export function buildConversationItems(
const b = createItemBuilder();

for (const event of events) {
const msg = event.message;
processEvent(b, event, options);
}

if (isJsonRpcNotification(msg)) {
handleNotification(b, msg, event.ts, options);
continue;
}
finalizeBuilder(b, isPromptPending);

if (isJsonRpcRequest(msg) && msg.method === "session/prompt") {
handlePromptRequest(b, msg, event.ts);
continue;
}
const lastTurnInfo = readLastTurnInfo(b);

if (isJsonRpcResponse(msg) && b.pendingPrompts.has(msg.id)) {
handlePromptResponse(b, msg, event.ts);
}
return { items: b.items, lastTurnInfo, isCompacting: b.isCompacting };
}

/**
* Apply one raw event to the builder. This is the append-only core: it never
* runs end-of-stream finalization, so it is safe to call incrementally as new
* events arrive without corrupting prior state.
*/
export function processEvent(
b: ItemBuilder,
event: AcpMessage,
options?: BuildConversationOptions,
) {
const msg = event.message;

if (isJsonRpcNotification(msg)) {
handleNotification(b, msg, event.ts, options);
return;
}

if (isJsonRpcRequest(msg) && msg.method === "session/prompt") {
handlePromptRequest(b, msg, event.ts);
return;
}

if (isJsonRpcResponse(msg) && b.pendingPrompts.has(msg.id)) {
handlePromptResponse(b, msg, event.ts);
}
}

/**
* End-of-stream finalization: speculative completions that assume no further
* events arrive. Mutates the builder in place, so an incremental consumer must
* only apply it to a snapshot it is about to read, never to state it will keep
* feeding events into.
*/
export function finalizeBuilder(
b: ItemBuilder,
isPromptPending: boolean | null,
) {
// Only mark unresolved prompts as cancelled when we actively track prompt
// state (local sessions). For cloud sessions isPromptPending is
// null, meaning that the response hasn't streamed "in" yet
Expand All @@ -207,16 +250,16 @@ export function buildConversationItems(
}

markThoughtCompletion(b.items);
}

const lastTurnInfo: LastTurnInfo | null = b.currentTurn
export function readLastTurnInfo(b: ItemBuilder): LastTurnInfo | null {
return b.currentTurn
? {
isComplete: b.currentTurn.isComplete,
durationMs: b.currentTurn.durationMs,
stopReason: b.currentTurn.stopReason,
}
: null;

return { items: b.items, lastTurnInfo, isCompacting: b.isCompacting };
}

function handlePromptRequest(
Expand Down Expand Up @@ -250,6 +293,7 @@ function handlePromptRequest(
turnComplete: false,
};

b.currentTurnStartIndex = b.items.length;
b.currentTurn = {
id: turnId,
promptId: msg.id,
Expand Down Expand Up @@ -460,6 +504,7 @@ function ensureProgressCardForGroup(
const card: ProgressCardState = {
steps: new Map(),
renderItem,
itemIndex: b.items.length,
};
b.progressCards.set(group, card);
pushItem(b, renderItem);
Expand Down Expand Up @@ -487,6 +532,9 @@ function handleProgress(b: ItemBuilder, rawParams: unknown, ts: number) {
const status = normalizeStepStatus(params.status);
const card = ensureProgressCardForGroup(b, params.group, ts);
if (!card) return;
if (card.itemIndex < b.lowestTouchedProgressIndex) {
b.lowestTouchedProgressIndex = card.itemIndex;
}
card.steps.set(params.step, {
key: params.step,
status,
Expand Down Expand Up @@ -525,6 +573,7 @@ function markCompactingStatusComplete(b: ItemBuilder) {
function ensureImplicitTurn(b: ItemBuilder, ts: number) {
if (b.currentTurn) return;

b.currentTurnStartIndex = b.items.length;
const turnId = `turn-${ts}-implicit`;
const toolCalls = new Map<string, ToolCall>();
const childItems = new Map<string, ConversationItem[]>();
Expand Down
Loading
Loading