diff --git a/packages/junior/src/chat/pi/traced-stream.ts b/packages/junior/src/chat/pi/traced-stream.ts new file mode 100644 index 00000000..d1f6aecf --- /dev/null +++ b/packages/junior/src/chat/pi/traced-stream.ts @@ -0,0 +1,119 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { + type Api, + type AssistantMessage, + type Context, + type Model, + streamSimple, +} from "@mariozechner/pi-ai"; +import * as Sentry from "@/chat/sentry"; +import { + extractGenAiUsageAttributes, + getLogContextAttributes, + serializeGenAiAttribute, +} from "@/chat/logging"; + +const PROVIDER_NAME = "vercel-ai-gateway"; + +// Compose only the OTel GenAI attributes that are knowable at span start +// (request-shape + system instructions). End-of-call attributes such as +// usage and finish reasons are set after the stream resolves. +function buildChatStartAttributes( + model: Model, + context: Context, +): Record { + const attributes: Record = { + "gen_ai.operation.name": "chat", + "gen_ai.provider.name": PROVIDER_NAME, + "gen_ai.request.model": model.id, + }; + + const inputMessages = serializeGenAiAttribute(context.messages); + if (inputMessages) { + attributes["gen_ai.input.messages"] = inputMessages; + } + + if (context.systemPrompt) { + const systemInstructions = serializeGenAiAttribute([ + { type: "text", content: context.systemPrompt }, + ]); + if (systemInstructions) { + attributes["gen_ai.system_instructions"] = systemInstructions; + } + } + + return attributes; +} + +// Composes post-stream attributes for the chat span. Two deferrals worth +// flagging: `extractGenAiUsageAttributes` currently omits cache token fields, +// and `gen_ai.response.finish_reasons` is emitted with pi-ai's raw StopReason +// (Sentry ingest expects the OTel set). Both are tracked separately and out +// of scope here. +function buildChatEndAttributes( + message: AssistantMessage, +): Record { + const attributes: Record = {}; + + const outputMessages = serializeGenAiAttribute([message]); + if (outputMessages) { + attributes["gen_ai.output.messages"] = outputMessages; + } + + Object.assign(attributes, extractGenAiUsageAttributes(message)); + + if (message.stopReason) { + attributes["gen_ai.response.finish_reasons"] = [message.stopReason]; + } + + if (message.model) { + attributes["gen_ai.response.model"] = message.model; + } + + return attributes; +} + +/** + * Wraps pi-ai's `streamSimple` so each LLM call inside a pi-agent-core agent + * loop produces its own `gen_ai.chat` Sentry span. The returned function is + * passed to `new Agent({ streamFn: ... })` and runs once per loop iteration. + * + * The base argument exists so tests can inject a stub stream function. + */ +export function createTracedStreamFn(base: StreamFn = streamSimple): StreamFn { + return async (model, context, options) => { + const span = Sentry.startInactiveSpan({ + name: `chat ${model.id}`, + op: "gen_ai.chat", + attributes: { + ...getLogContextAttributes(), + ...buildChatStartAttributes(model, context), + }, + }); + + try { + const stream = await Sentry.withActiveSpan(span, () => + Promise.resolve(base(model, context, options)), + ); + + stream.result().then( + (finalMessage) => { + for (const [key, value] of Object.entries( + buildChatEndAttributes(finalMessage), + )) { + span.setAttribute(key, value); + } + span.end(); + }, + () => { + span.end(); + }, + ); + + return stream; + } catch (error) { + span.end(); + throw error; + } + }; +} diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index e56da16c..bc9a6895 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -50,6 +50,7 @@ import { resolveGatewayModel, } from "@/chat/pi/client"; import type { PiMessage } from "@/chat/pi/messages"; +import { createTracedStreamFn } from "@/chat/pi/traced-stream"; import { createSandboxExecutor, type SandboxAcquiredState, @@ -918,6 +919,7 @@ export async function generateAssistantReply( // ── Agent execution ────────────────────────────────────────────── agent = new Agent({ getApiKey: () => getPiGatewayApiKeyOverride(), + streamFn: createTracedStreamFn(), initialState: { systemPrompt: baseInstructions, model: resolveGatewayModel(botConfig.modelId), diff --git a/packages/junior/tests/unit/chat/pi/traced-stream.test.ts b/packages/junior/tests/unit/chat/pi/traced-stream.test.ts new file mode 100644 index 00000000..2bbbd852 --- /dev/null +++ b/packages/junior/tests/unit/chat/pi/traced-stream.test.ts @@ -0,0 +1,207 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + createAssistantMessageEventStream, + type AssistantMessage, + type Model, +} from "@mariozechner/pi-ai"; + +const { startInactiveSpan, withActiveSpan } = vi.hoisted(() => { + const span = { + setAttribute: vi.fn(), + setAttributes: vi.fn(), + setStatus: vi.fn(), + end: vi.fn(), + }; + return { + startInactiveSpan: vi.fn((_options: unknown) => span), + withActiveSpan: vi.fn((_s: unknown, cb: () => T) => cb()), + }; +}); + +vi.mock("@/chat/sentry", () => ({ + startInactiveSpan, + withActiveSpan, +})); + +function fakeModel(id: string): Model<"anthropic-messages"> { + return { id } as unknown as Model<"anthropic-messages">; +} + +function fakeMessage(): AssistantMessage { + return { + role: "assistant", + content: [{ type: "text", text: "hi" }], + api: "anthropic-messages", + provider: "vercel-ai-gateway", + model: "openai/gpt-5.4", + usage: { + input: 100, + output: 5, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 105, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; +} + +type SpanMock = { + setAttribute: ReturnType; + setAttributes: ReturnType; + setStatus: ReturnType; + end: ReturnType; +}; + +function getSpan(): SpanMock { + return startInactiveSpan.mock.results[0]!.value as SpanMock; +} + +describe("createTracedStreamFn", () => { + afterEach(() => { + vi.clearAllMocks(); + vi.resetModules(); + }); + + it("opens a gen_ai.chat span when invoked", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + const returned = await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + expect(returned).toBe(stream); + expect(startInactiveSpan).toHaveBeenCalledTimes(1); + const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as { + name: string; + op: string; + }; + expect(opts.op).toBe("gen_ai.chat"); + expect(opts.name).toBe("chat openai/gpt-5.4"); + }); + + it("sets gen_ai.input.messages and gen_ai.system_instructions on the chat span", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await traced( + fakeModel("openai/gpt-5.4"), + { + systemPrompt: "you are junior", + messages: [{ role: "user", content: "hello", timestamp: 0 }], + }, + undefined, + ); + + const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as { + attributes: Record; + }; + expect(opts.attributes["gen_ai.provider.name"]).toBe("vercel-ai-gateway"); + expect(typeof opts.attributes["gen_ai.input.messages"]).toBe("string"); + expect(opts.attributes["gen_ai.input.messages"]).toContain("hello"); + expect(typeof opts.attributes["gen_ai.system_instructions"]).toBe("string"); + expect(opts.attributes["gen_ai.system_instructions"]).toContain( + "you are junior", + ); + expect(opts.attributes["gen_ai.operation.name"]).toBe("chat"); + expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4"); + }); + + it("sets output.messages, usage tokens, finish_reasons, response.model after stream completion", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + const returned = await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + expect(returned).toBe(stream); + + // Resolve the stream's terminal Promise to trigger end-attribute population. + const finalMessage = fakeMessage(); + stream.end(finalMessage); + await stream.result(); + // Allow the .then callback to flush. + await new Promise((r) => setImmediate(r)); + + const span = getSpan(); + const endAttributes = Object.fromEntries( + span.setAttribute.mock.calls.map((c) => [c[0], c[1]]), + ); + expect(typeof endAttributes["gen_ai.output.messages"]).toBe("string"); + expect(endAttributes["gen_ai.usage.input_tokens"]).toBe(100); + expect(endAttributes["gen_ai.usage.output_tokens"]).toBe(5); + expect(endAttributes["gen_ai.response.finish_reasons"]).toEqual(["stop"]); + expect(endAttributes["gen_ai.response.model"]).toBe("openai/gpt-5.4"); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it("inherits LogContext attributes (e.g. gen_ai.conversation.id) onto the chat span", async () => { + const { withLogContext } = await import("@/chat/logging"); + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + const traced = createTracedStreamFn(base as unknown as StreamFn); + + await withLogContext( + { conversationId: "conv_123", turnId: "turn_456" }, + async () => { + await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + }, + ); + + const opts = startInactiveSpan.mock.calls[0]?.[0] as { + attributes: Record; + }; + expect(opts.attributes["gen_ai.conversation.id"]).toBe("conv_123"); + expect(opts.attributes["app.turn.id"]).toBe("turn_456"); + // wrapper-supplied attributes still present + expect(opts.attributes["gen_ai.operation.name"]).toBe("chat"); + expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4"); + }); + + it("ends the span when the stream errors", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + // pi-ai's AssistantMessageEventStream resolves `result()` with the carrier + // AssistantMessage on `error` events instead of rejecting, so the wrapper's + // `.then` success arm runs on the error path. The load-bearing invariant + // is that the span ends exactly once. + const errorMessage = { ...fakeMessage(), stopReason: "error" as const }; + stream.push({ type: "error", reason: "error", error: errorMessage }); + await stream.result(); + await new Promise((r) => setImmediate(r)); + + const span = getSpan(); + expect(span.end).toHaveBeenCalledTimes(1); + // End attributes are still populated because the success arm runs. + const endAttributeKeys = span.setAttribute.mock.calls.map((c) => c[0]); + expect(endAttributeKeys).toContain("gen_ai.output.messages"); + }); +}); diff --git a/specs/logging/tracing-spec.md b/specs/logging/tracing-spec.md index cbc829b5..5c4cf398 100644 --- a/specs/logging/tracing-spec.md +++ b/specs/logging/tracing-spec.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-02-25 -- Last Edited: 2026-05-01 +- Last Edited: 2026-05-06 ## Changelog @@ -14,6 +14,7 @@ - 2026-04-06: Added official GenAI finish-reasons, system-instructions, and tool-description tracing attributes. - 2026-04-28: Added MCP tool-call span attributes from the OpenTelemetry MCP semantic conventions. - 2026-05-01: Added `gen_ai.conversation.id` as required correlation context for GenAI spans when available. +- 2026-05-06: Documented the `gen_ai.invoke_agent` / `gen_ai.chat` span hierarchy rule. ## Status @@ -182,6 +183,14 @@ semantic conventions: - Sandbox spans should be nested under `ai.generate_assistant_reply` when invoked during reply generation. - Sandbox execution spans should be nested under the active tool-call/request span context. +### GenAI Span Hierarchy + +- A `gen_ai.invoke_agent` span MUST have at least one `gen_ai.chat` child covering the LLM call(s) issued during its agent loop. +- A `gen_ai.chat` span MAY appear at the top level (as a sibling of `gen_ai.invoke_agent`, or under a non-`gen_ai.*` parent such as `chat.route_thinking`) only when it represents an LLM call that is independent of an agent loop, for example a routing or classification pre-flight. +- Every `gen_ai.chat` span MUST carry `gen_ai.input.messages` and `gen_ai.output.messages`. +- The parent `gen_ai.invoke_agent` MAY also carry `gen_ai.input.messages` / `gen_ai.output.messages` as a high-level rollup; this is optional. +- The per-iteration `gen_ai.chat` child span is created in `packages/junior/src/chat/pi/traced-stream.ts` via the `streamFn` injected into `pi-agent-core`'s `Agent`. + ## Rollout Guidance - Start with lifecycle + I/O spans.