-
Notifications
You must be signed in to change notification settings - Fork 11
fix(chat): Improve agent loop tracing #303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| import type { StreamFn } from "@mariozechner/pi-agent-core"; | ||
| import { | ||
| type Api, | ||
| type AssistantMessage, | ||
| type Context, | ||
| type Model, | ||
| streamSimple, | ||
| } from "@mariozechner/pi-ai"; | ||
| import * as Sentry from "@/chat/sentry"; | ||
| import { | ||
| extractGenAiUsageAttributes, | ||
| getLogContextAttributes, | ||
| serializeGenAiAttribute, | ||
| } from "@/chat/logging"; | ||
|
|
||
| const PROVIDER_NAME = "vercel-ai-gateway"; | ||
|
|
||
| // Compose only the OTel GenAI attributes that are knowable at span start | ||
| // (request-shape + system instructions). End-of-call attributes such as | ||
| // usage and finish reasons are set after the stream resolves. | ||
| function buildChatStartAttributes( | ||
| model: Model<Api>, | ||
| context: Context, | ||
| ): Record<string, string> { | ||
| const attributes: Record<string, string> = { | ||
| "gen_ai.operation.name": "chat", | ||
| "gen_ai.provider.name": PROVIDER_NAME, | ||
| "gen_ai.request.model": model.id, | ||
| }; | ||
|
|
||
| const inputMessages = serializeGenAiAttribute(context.messages); | ||
| if (inputMessages) { | ||
| attributes["gen_ai.input.messages"] = inputMessages; | ||
| } | ||
|
|
||
| if (context.systemPrompt) { | ||
| const systemInstructions = serializeGenAiAttribute([ | ||
| { type: "text", content: context.systemPrompt }, | ||
| ]); | ||
| if (systemInstructions) { | ||
| attributes["gen_ai.system_instructions"] = systemInstructions; | ||
| } | ||
| } | ||
|
|
||
| return attributes; | ||
| } | ||
|
|
||
| // Composes post-stream attributes for the chat span. Two deferrals worth | ||
| // flagging: `extractGenAiUsageAttributes` currently omits cache token fields, | ||
| // and `gen_ai.response.finish_reasons` is emitted with pi-ai's raw StopReason | ||
| // (Sentry ingest expects the OTel set). Both are tracked separately and out | ||
| // of scope here. | ||
| function buildChatEndAttributes( | ||
| message: AssistantMessage, | ||
| ): Record<string, string | string[] | number> { | ||
| const attributes: Record<string, string | string[] | number> = {}; | ||
|
|
||
| const outputMessages = serializeGenAiAttribute([message]); | ||
| if (outputMessages) { | ||
| attributes["gen_ai.output.messages"] = outputMessages; | ||
| } | ||
|
|
||
| Object.assign(attributes, extractGenAiUsageAttributes(message)); | ||
|
|
||
| if (message.stopReason) { | ||
| attributes["gen_ai.response.finish_reasons"] = [message.stopReason]; | ||
| } | ||
|
|
||
| if (message.model) { | ||
| attributes["gen_ai.response.model"] = message.model; | ||
| } | ||
|
|
||
| return attributes; | ||
| } | ||
|
|
||
| /** | ||
| * Wraps pi-ai's `streamSimple` so each LLM call inside a pi-agent-core agent | ||
| * loop produces its own `gen_ai.chat` Sentry span. The returned function is | ||
| * passed to `new Agent({ streamFn: ... })` and runs once per loop iteration. | ||
| * | ||
| * The base argument exists so tests can inject a stub stream function. | ||
| */ | ||
| export function createTracedStreamFn(base: StreamFn = streamSimple): StreamFn { | ||
| return async (model, context, options) => { | ||
| const span = Sentry.startInactiveSpan({ | ||
| name: `chat ${model.id}`, | ||
| op: "gen_ai.chat", | ||
| attributes: { | ||
| ...getLogContextAttributes(), | ||
| ...buildChatStartAttributes(model, context), | ||
| }, | ||
| }); | ||
|
|
||
| try { | ||
| const stream = await Sentry.withActiveSpan(span, () => | ||
| Promise.resolve(base(model, context, options)), | ||
| ); | ||
|
|
||
| stream.result().then( | ||
| (finalMessage) => { | ||
| for (const [key, value] of Object.entries( | ||
| buildChatEndAttributes(finalMessage), | ||
| )) { | ||
| span.setAttribute(key, value); | ||
| } | ||
| span.end(); | ||
| }, | ||
| () => { | ||
| span.end(); | ||
| }, | ||
| ); | ||
|
|
||
| return stream; | ||
| } catch (error) { | ||
| span.end(); | ||
| throw error; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error spans lack status, appearing successful in SentryMedium Severity When Additional Locations (1)Reviewed by Cursor Bugbot for commit a9661dd. Configure here. |
||
| }; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| import type { StreamFn } from "@mariozechner/pi-agent-core"; | ||
| import { afterEach, describe, expect, it, vi } from "vitest"; | ||
| import { | ||
| createAssistantMessageEventStream, | ||
| type AssistantMessage, | ||
| type Model, | ||
| } from "@mariozechner/pi-ai"; | ||
|
|
||
| const { startInactiveSpan, withActiveSpan } = vi.hoisted(() => { | ||
| const span = { | ||
| setAttribute: vi.fn(), | ||
| setAttributes: vi.fn(), | ||
| setStatus: vi.fn(), | ||
| end: vi.fn(), | ||
| }; | ||
| return { | ||
| startInactiveSpan: vi.fn((_options: unknown) => span), | ||
| withActiveSpan: vi.fn(<T>(_s: unknown, cb: () => T) => cb()), | ||
| }; | ||
| }); | ||
|
|
||
| vi.mock("@/chat/sentry", () => ({ | ||
| startInactiveSpan, | ||
| withActiveSpan, | ||
| })); | ||
|
|
||
| function fakeModel(id: string): Model<"anthropic-messages"> { | ||
| return { id } as unknown as Model<"anthropic-messages">; | ||
| } | ||
|
|
||
| function fakeMessage(): AssistantMessage { | ||
| return { | ||
| role: "assistant", | ||
| content: [{ type: "text", text: "hi" }], | ||
| api: "anthropic-messages", | ||
| provider: "vercel-ai-gateway", | ||
| model: "openai/gpt-5.4", | ||
| usage: { | ||
| input: 100, | ||
| output: 5, | ||
| cacheRead: 0, | ||
| cacheWrite: 0, | ||
| totalTokens: 105, | ||
| cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, | ||
| }, | ||
| stopReason: "stop", | ||
| timestamp: Date.now(), | ||
| }; | ||
| } | ||
|
|
||
| type SpanMock = { | ||
| setAttribute: ReturnType<typeof vi.fn>; | ||
| setAttributes: ReturnType<typeof vi.fn>; | ||
| setStatus: ReturnType<typeof vi.fn>; | ||
| end: ReturnType<typeof vi.fn>; | ||
| }; | ||
|
|
||
| function getSpan(): SpanMock { | ||
| return startInactiveSpan.mock.results[0]!.value as SpanMock; | ||
| } | ||
|
|
||
| describe("createTracedStreamFn", () => { | ||
| afterEach(() => { | ||
| vi.clearAllMocks(); | ||
| vi.resetModules(); | ||
| }); | ||
|
|
||
| it("opens a gen_ai.chat span when invoked", async () => { | ||
| const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); | ||
| const stream = createAssistantMessageEventStream(); | ||
| const base = vi.fn(() => stream); | ||
|
|
||
| const traced = createTracedStreamFn(base as unknown as StreamFn); | ||
| const returned = await traced( | ||
| fakeModel("openai/gpt-5.4"), | ||
| { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, | ||
| undefined, | ||
| ); | ||
|
|
||
| expect(returned).toBe(stream); | ||
| expect(startInactiveSpan).toHaveBeenCalledTimes(1); | ||
| const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as { | ||
| name: string; | ||
| op: string; | ||
| }; | ||
| expect(opts.op).toBe("gen_ai.chat"); | ||
| expect(opts.name).toBe("chat openai/gpt-5.4"); | ||
| }); | ||
|
|
||
| it("sets gen_ai.input.messages and gen_ai.system_instructions on the chat span", async () => { | ||
| const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); | ||
| const stream = createAssistantMessageEventStream(); | ||
| const base = vi.fn(() => stream); | ||
|
|
||
| const traced = createTracedStreamFn(base as unknown as StreamFn); | ||
| await traced( | ||
| fakeModel("openai/gpt-5.4"), | ||
| { | ||
| systemPrompt: "you are junior", | ||
| messages: [{ role: "user", content: "hello", timestamp: 0 }], | ||
| }, | ||
| undefined, | ||
| ); | ||
|
|
||
| const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as { | ||
| attributes: Record<string, unknown>; | ||
| }; | ||
| expect(opts.attributes["gen_ai.provider.name"]).toBe("vercel-ai-gateway"); | ||
| expect(typeof opts.attributes["gen_ai.input.messages"]).toBe("string"); | ||
| expect(opts.attributes["gen_ai.input.messages"]).toContain("hello"); | ||
| expect(typeof opts.attributes["gen_ai.system_instructions"]).toBe("string"); | ||
| expect(opts.attributes["gen_ai.system_instructions"]).toContain( | ||
| "you are junior", | ||
| ); | ||
| expect(opts.attributes["gen_ai.operation.name"]).toBe("chat"); | ||
| expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4"); | ||
| }); | ||
|
|
||
| it("sets output.messages, usage tokens, finish_reasons, response.model after stream completion", async () => { | ||
| const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); | ||
| const stream = createAssistantMessageEventStream(); | ||
| const base = vi.fn(() => stream); | ||
|
|
||
| const traced = createTracedStreamFn(base as unknown as StreamFn); | ||
| const returned = await traced( | ||
| fakeModel("openai/gpt-5.4"), | ||
| { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, | ||
| undefined, | ||
| ); | ||
|
|
||
| expect(returned).toBe(stream); | ||
|
|
||
| // Resolve the stream's terminal Promise to trigger end-attribute population. | ||
| const finalMessage = fakeMessage(); | ||
| stream.end(finalMessage); | ||
| await stream.result(); | ||
| // Allow the .then callback to flush. | ||
| await new Promise((r) => setImmediate(r)); | ||
|
|
||
| const span = getSpan(); | ||
| const endAttributes = Object.fromEntries( | ||
| span.setAttribute.mock.calls.map((c) => [c[0], c[1]]), | ||
| ); | ||
| expect(typeof endAttributes["gen_ai.output.messages"]).toBe("string"); | ||
| expect(endAttributes["gen_ai.usage.input_tokens"]).toBe(100); | ||
| expect(endAttributes["gen_ai.usage.output_tokens"]).toBe(5); | ||
| expect(endAttributes["gen_ai.response.finish_reasons"]).toEqual(["stop"]); | ||
| expect(endAttributes["gen_ai.response.model"]).toBe("openai/gpt-5.4"); | ||
| expect(span.end).toHaveBeenCalledTimes(1); | ||
| }); | ||
|
|
||
| it("inherits LogContext attributes (e.g. gen_ai.conversation.id) onto the chat span", async () => { | ||
| const { withLogContext } = await import("@/chat/logging"); | ||
| const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); | ||
| const stream = createAssistantMessageEventStream(); | ||
| const base = vi.fn(() => stream); | ||
| const traced = createTracedStreamFn(base as unknown as StreamFn); | ||
|
|
||
| await withLogContext( | ||
| { conversationId: "conv_123", turnId: "turn_456" }, | ||
| async () => { | ||
| await traced( | ||
| fakeModel("openai/gpt-5.4"), | ||
| { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, | ||
| undefined, | ||
| ); | ||
| }, | ||
| ); | ||
|
|
||
| const opts = startInactiveSpan.mock.calls[0]?.[0] as { | ||
| attributes: Record<string, unknown>; | ||
| }; | ||
| expect(opts.attributes["gen_ai.conversation.id"]).toBe("conv_123"); | ||
| expect(opts.attributes["app.turn.id"]).toBe("turn_456"); | ||
| // wrapper-supplied attributes still present | ||
| expect(opts.attributes["gen_ai.operation.name"]).toBe("chat"); | ||
| expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4"); | ||
| }); | ||
|
|
||
| it("ends the span when the stream errors", async () => { | ||
| const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); | ||
| const stream = createAssistantMessageEventStream(); | ||
| const base = vi.fn(() => stream); | ||
|
|
||
| const traced = createTracedStreamFn(base as unknown as StreamFn); | ||
| await traced( | ||
| fakeModel("openai/gpt-5.4"), | ||
| { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, | ||
| undefined, | ||
| ); | ||
|
|
||
| // pi-ai's AssistantMessageEventStream resolves `result()` with the carrier | ||
| // AssistantMessage on `error` events instead of rejecting, so the wrapper's | ||
| // `.then` success arm runs on the error path. The load-bearing invariant | ||
| // is that the span ends exactly once. | ||
| const errorMessage = { ...fakeMessage(), stopReason: "error" as const }; | ||
| stream.push({ type: "error", reason: "error", error: errorMessage }); | ||
| await stream.result(); | ||
| await new Promise((r) => setImmediate(r)); | ||
|
|
||
| const span = getSpan(); | ||
| expect(span.end).toHaveBeenCalledTimes(1); | ||
| // End attributes are still populated because the success arm runs. | ||
| const endAttributeKeys = span.setAttribute.mock.calls.map((c) => c[0]); | ||
| expect(endAttributeKeys).toContain("gen_ai.output.messages"); | ||
| }); | ||
| }); |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Span leaks if success callback throws unexpectedly
Low Severity
The
stream.result().then(successHandler, rejectionHandler)creates a floating promise. If any statement inside the success handler (e.g.,buildChatEndAttributesorspan.setAttribute) throws beforespan.end()is reached, the span never ends (leaks) and the error becomes an unhandled promise rejection. Wrapping the success handler body intry/finallywithspan.end()infinallywould ensure the span always closes, matching the defensive style of thecatchblock at line 114.Reviewed by Cursor Bugbot for commit a9661dd. Configure here.