Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions packages/junior/src/chat/pi/traced-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import {
type Api,
type AssistantMessage,
type Context,
type Model,
streamSimple,
} from "@mariozechner/pi-ai";
import * as Sentry from "@/chat/sentry";
import {
extractGenAiUsageAttributes,
getLogContextAttributes,
serializeGenAiAttribute,
} from "@/chat/logging";

const PROVIDER_NAME = "vercel-ai-gateway";

// Compose only the OTel GenAI attributes that are knowable at span start
// (request-shape + system instructions). End-of-call attributes such as
// usage and finish reasons are set after the stream resolves.
function buildChatStartAttributes(
model: Model<Api>,
context: Context,
): Record<string, string> {
const attributes: Record<string, string> = {
"gen_ai.operation.name": "chat",
"gen_ai.provider.name": PROVIDER_NAME,
"gen_ai.request.model": model.id,
};

const inputMessages = serializeGenAiAttribute(context.messages);
if (inputMessages) {
attributes["gen_ai.input.messages"] = inputMessages;
}

if (context.systemPrompt) {
const systemInstructions = serializeGenAiAttribute([
{ type: "text", content: context.systemPrompt },
]);
if (systemInstructions) {
attributes["gen_ai.system_instructions"] = systemInstructions;
}
}

return attributes;
}

// Composes post-stream attributes for the chat span. Two deferrals worth
// flagging: `extractGenAiUsageAttributes` currently omits cache token fields,
// and `gen_ai.response.finish_reasons` is emitted with pi-ai's raw StopReason
// (Sentry ingest expects the OTel set). Both are tracked separately and out
// of scope here.
function buildChatEndAttributes(
message: AssistantMessage,
): Record<string, string | string[] | number> {
const attributes: Record<string, string | string[] | number> = {};

const outputMessages = serializeGenAiAttribute([message]);
if (outputMessages) {
attributes["gen_ai.output.messages"] = outputMessages;
}

Object.assign(attributes, extractGenAiUsageAttributes(message));

if (message.stopReason) {
attributes["gen_ai.response.finish_reasons"] = [message.stopReason];
}

if (message.model) {
attributes["gen_ai.response.model"] = message.model;
}

return attributes;
}

/**
* Wraps pi-ai's `streamSimple` so each LLM call inside a pi-agent-core agent
* loop produces its own `gen_ai.chat` Sentry span. The returned function is
* passed to `new Agent({ streamFn: ... })` and runs once per loop iteration.
*
* The base argument exists so tests can inject a stub stream function.
*/
export function createTracedStreamFn(base: StreamFn = streamSimple): StreamFn {
return async (model, context, options) => {
const span = Sentry.startInactiveSpan({
name: `chat ${model.id}`,
op: "gen_ai.chat",
attributes: {
...getLogContextAttributes(),
...buildChatStartAttributes(model, context),
},
});

try {
const stream = await Sentry.withActiveSpan(span, () =>
Promise.resolve(base(model, context, options)),
);

stream.result().then(
(finalMessage) => {
for (const [key, value] of Object.entries(
buildChatEndAttributes(finalMessage),
)) {
span.setAttribute(key, value);
}
span.end();
},
() => {
span.end();
},
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Span leaks if success callback throws unexpectedly

Low Severity

The stream.result().then(successHandler, rejectionHandler) creates a floating promise. If any statement inside the success handler (e.g., buildChatEndAttributes or span.setAttribute) throws before span.end() is reached, the span never ends (leaks) and the error becomes an unhandled promise rejection. Wrapping the success handler body in try/finally with span.end() in finally would ensure the span always closes, matching the defensive style of the catch block at line 114.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a9661dd. Configure here.


return stream;
} catch (error) {
span.end();
throw error;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error spans lack status, appearing successful in Sentry

Medium Severity

When base() throws (catch block) or stream.result() rejects (rejection handler), the span is ended without setting its status to error. Unlike Sentry.startSpan (used elsewhere via withSpan), startInactiveSpan has no automatic error-status propagation. Failed LLM calls will appear as successful in Sentry's trace waterfall, undermining the tracing improvement this PR aims to deliver. The same gap exists for the success path when stopReason is "error".

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a9661dd. Configure here.

};
}
2 changes: 2 additions & 0 deletions packages/junior/src/chat/respond.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
resolveGatewayModel,
} from "@/chat/pi/client";
import type { PiMessage } from "@/chat/pi/messages";
import { createTracedStreamFn } from "@/chat/pi/traced-stream";
import {
createSandboxExecutor,
type SandboxAcquiredState,
Expand Down Expand Up @@ -918,6 +919,7 @@ export async function generateAssistantReply(
// ── Agent execution ──────────────────────────────────────────────
agent = new Agent({
getApiKey: () => getPiGatewayApiKeyOverride(),
streamFn: createTracedStreamFn(),
initialState: {
systemPrompt: baseInstructions,
model: resolveGatewayModel(botConfig.modelId),
Expand Down
207 changes: 207 additions & 0 deletions packages/junior/tests/unit/chat/pi/traced-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
createAssistantMessageEventStream,
type AssistantMessage,
type Model,
} from "@mariozechner/pi-ai";

const { startInactiveSpan, withActiveSpan } = vi.hoisted(() => {
const span = {
setAttribute: vi.fn(),
setAttributes: vi.fn(),
setStatus: vi.fn(),
end: vi.fn(),
};
return {
startInactiveSpan: vi.fn((_options: unknown) => span),
withActiveSpan: vi.fn(<T>(_s: unknown, cb: () => T) => cb()),
};
});

vi.mock("@/chat/sentry", () => ({
startInactiveSpan,
withActiveSpan,
}));

function fakeModel(id: string): Model<"anthropic-messages"> {
return { id } as unknown as Model<"anthropic-messages">;
}

function fakeMessage(): AssistantMessage {
return {
role: "assistant",
content: [{ type: "text", text: "hi" }],
api: "anthropic-messages",
provider: "vercel-ai-gateway",
model: "openai/gpt-5.4",
usage: {
input: 100,
output: 5,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 105,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}

type SpanMock = {
setAttribute: ReturnType<typeof vi.fn>;
setAttributes: ReturnType<typeof vi.fn>;
setStatus: ReturnType<typeof vi.fn>;
end: ReturnType<typeof vi.fn>;
};

function getSpan(): SpanMock {
return startInactiveSpan.mock.results[0]!.value as SpanMock;
}

describe("createTracedStreamFn", () => {
afterEach(() => {
vi.clearAllMocks();
vi.resetModules();
});

it("opens a gen_ai.chat span when invoked", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
const returned = await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);

expect(returned).toBe(stream);
expect(startInactiveSpan).toHaveBeenCalledTimes(1);
const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as {
name: string;
op: string;
};
expect(opts.op).toBe("gen_ai.chat");
expect(opts.name).toBe("chat openai/gpt-5.4");
});

it("sets gen_ai.input.messages and gen_ai.system_instructions on the chat span", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
await traced(
fakeModel("openai/gpt-5.4"),
{
systemPrompt: "you are junior",
messages: [{ role: "user", content: "hello", timestamp: 0 }],
},
undefined,
);

const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as {
attributes: Record<string, unknown>;
};
expect(opts.attributes["gen_ai.provider.name"]).toBe("vercel-ai-gateway");
expect(typeof opts.attributes["gen_ai.input.messages"]).toBe("string");
expect(opts.attributes["gen_ai.input.messages"]).toContain("hello");
expect(typeof opts.attributes["gen_ai.system_instructions"]).toBe("string");
expect(opts.attributes["gen_ai.system_instructions"]).toContain(
"you are junior",
);
expect(opts.attributes["gen_ai.operation.name"]).toBe("chat");
expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4");
});

it("sets output.messages, usage tokens, finish_reasons, response.model after stream completion", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
const returned = await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);

expect(returned).toBe(stream);

// Resolve the stream's terminal Promise to trigger end-attribute population.
const finalMessage = fakeMessage();
stream.end(finalMessage);
await stream.result();
// Allow the .then callback to flush.
await new Promise((r) => setImmediate(r));

const span = getSpan();
const endAttributes = Object.fromEntries(
span.setAttribute.mock.calls.map((c) => [c[0], c[1]]),
);
expect(typeof endAttributes["gen_ai.output.messages"]).toBe("string");
expect(endAttributes["gen_ai.usage.input_tokens"]).toBe(100);
expect(endAttributes["gen_ai.usage.output_tokens"]).toBe(5);
expect(endAttributes["gen_ai.response.finish_reasons"]).toEqual(["stop"]);
expect(endAttributes["gen_ai.response.model"]).toBe("openai/gpt-5.4");
expect(span.end).toHaveBeenCalledTimes(1);
});

it("inherits LogContext attributes (e.g. gen_ai.conversation.id) onto the chat span", async () => {
const { withLogContext } = await import("@/chat/logging");
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);
const traced = createTracedStreamFn(base as unknown as StreamFn);

await withLogContext(
{ conversationId: "conv_123", turnId: "turn_456" },
async () => {
await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);
},
);

const opts = startInactiveSpan.mock.calls[0]?.[0] as {
attributes: Record<string, unknown>;
};
expect(opts.attributes["gen_ai.conversation.id"]).toBe("conv_123");
expect(opts.attributes["app.turn.id"]).toBe("turn_456");
// wrapper-supplied attributes still present
expect(opts.attributes["gen_ai.operation.name"]).toBe("chat");
expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4");
});

it("ends the span when the stream errors", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);

// pi-ai's AssistantMessageEventStream resolves `result()` with the carrier
// AssistantMessage on `error` events instead of rejecting, so the wrapper's
// `.then` success arm runs on the error path. The load-bearing invariant
// is that the span ends exactly once.
const errorMessage = { ...fakeMessage(), stopReason: "error" as const };
stream.push({ type: "error", reason: "error", error: errorMessage });
await stream.result();
await new Promise((r) => setImmediate(r));

const span = getSpan();
expect(span.end).toHaveBeenCalledTimes(1);
// End attributes are still populated because the success arm runs.
const endAttributeKeys = span.setAttribute.mock.calls.map((c) => c[0]);
expect(endAttributeKeys).toContain("gen_ai.output.messages");
});
});
11 changes: 10 additions & 1 deletion specs/logging/tracing-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Metadata

- Created: 2026-02-25
- Last Edited: 2026-05-01
- Last Edited: 2026-05-06

## Changelog

Expand All @@ -14,6 +14,7 @@
- 2026-04-06: Added official GenAI finish-reasons, system-instructions, and tool-description tracing attributes.
- 2026-04-28: Added MCP tool-call span attributes from the OpenTelemetry MCP semantic conventions.
- 2026-05-01: Added `gen_ai.conversation.id` as required correlation context for GenAI spans when available.
- 2026-05-06: Documented the `gen_ai.invoke_agent` / `gen_ai.chat` span hierarchy rule.

## Status

Expand Down Expand Up @@ -182,6 +183,14 @@ semantic conventions:
- Sandbox spans should be nested under `ai.generate_assistant_reply` when invoked during reply generation.
- Sandbox execution spans should be nested under the active tool-call/request span context.

### GenAI Span Hierarchy

- A `gen_ai.invoke_agent` span MUST have at least one `gen_ai.chat` child covering the LLM call(s) issued during its agent loop.
- A `gen_ai.chat` span MAY appear at the top level (as a sibling of `gen_ai.invoke_agent`, or under a non-`gen_ai.*` parent such as `chat.route_thinking`) only when it represents an LLM call that is independent of an agent loop, for example a routing or classification pre-flight.
- Every `gen_ai.chat` span MUST carry `gen_ai.input.messages` and `gen_ai.output.messages`.
- The parent `gen_ai.invoke_agent` MAY also carry `gen_ai.input.messages` / `gen_ai.output.messages` as a high-level rollup; this is optional.
- The per-iteration `gen_ai.chat` child span is created in `packages/junior/src/chat/pi/traced-stream.ts` via the `streamFn` injected into `pi-agent-core`'s `Agent`.

## Rollout Guidance

- Start with lifecycle + I/O spans.
Expand Down
Loading